diff --git a/table.cc b/table.cc index 47ee52fa7f..85a54c18cf 100644 --- a/table.cc +++ b/table.cc @@ -1943,12 +1943,21 @@ table::query(schema_ptr s, : query::data_querier(as_mutation_source(), s, class_config.semaphore.make_permit(s.get(), "data-query"), range, qs.cmd.slice, service::get_local_sstable_query_read_priority(), trace_state); + std::exception_ptr ex; + try { co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, timeout, class_config.max_memory_for_unlimited_query); if (q.are_limits_reached() || qs.builder.is_short_read()) { cache_ctx.insert(std::move(q), std::move(trace_state)); } + } catch (...) { + ex = std::current_exception(); + } + co_await q.close(); + if (ex) { + std::rethrow_exception(std::move(ex)); + } } co_return make_lw_shared(qs.builder.build()); @@ -1973,14 +1982,21 @@ table::mutation_query(schema_ptr s, : query::mutation_querier(as_mutation_source(), s, class_config.semaphore.make_permit(s.get(), "mutation-query"), range, cmd.slice, service::get_local_sstable_query_read_priority(), trace_state); + std::exception_ptr ex; + try { auto rrb = reconcilable_result_builder(*s, cmd.slice, std::move(accounter)); auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, timeout, class_config.max_memory_for_unlimited_query); if (q.are_limits_reached() || r.is_short_read()) { cache_ctx.insert(std::move(q), std::move(trace_state)); } - + co_await q.close(); co_return r; + } catch (...) { + ex = std::current_exception(); + } + co_await q.close(); + std::rethrow_exception(std::move(ex)); } mutation_source diff --git a/test/boost/mutation_query_test.cc b/test/boost/mutation_query_test.cc index 47c041c3c5..a415089ee7 100644 --- a/test/boost/mutation_query_test.cc +++ b/test/boost/mutation_query_test.cc @@ -90,6 +90,7 @@ static reconcilable_result mutation_query(schema_ptr s, const mutation_source& s uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time) { auto querier = query::mutation_querier(source, s, tests::make_permit(), range, slice, service::get_local_sstable_query_read_priority(), {}); + auto close_querier = deferred_close(querier); auto rrb = reconcilable_result_builder(*s, slice, make_accounter()); return querier.consume_page(std::move(rrb), row_limit, partition_limit, query_time, db::no_timeout, query::max_result_size(std::numeric_limits::max())).get(); @@ -538,6 +539,7 @@ SEASTAR_TEST_CASE(test_partition_limit) { static void data_query(schema_ptr s, const mutation_source& source, const dht::partition_range& range, const query::partition_slice& slice, query::result::builder& builder) { auto querier = query::data_querier(source, s, tests::make_permit(), range, slice, service::get_local_sstable_query_read_priority(), {}); + auto close_querier = deferred_close(querier); auto qrb = query_result_builder(*s, builder); querier.consume_page(std::move(qrb), std::numeric_limits::max(), std::numeric_limits::max(), gc_clock::now(), db::no_timeout, query::max_result_size(std::numeric_limits::max())).get();