From 8386b55e9c0346b81a75e73171e5888e092c1e8f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 10 Sep 2021 20:01:03 +0200 Subject: [PATCH 1/4] query_class_config: remove query::max_result_size default constructor The default values for the fields of this class didn't make much sense, and the default constructor was used only in a single place so removing it is trivial. It's safer when the user is forced to supply the limits. --- query_class_config.hh | 6 +++--- test/perf/perf_row_cache_reads.cc | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/query_class_config.hh b/query_class_config.hh index a8c02ec18e..8d88b03756 100644 --- a/query_class_config.hh +++ b/query_class_config.hh @@ -28,10 +28,10 @@ class reader_concurrency_semaphore; namespace query { struct max_result_size { - uint64_t soft_limit = 0; - uint64_t hard_limit = 0; + uint64_t soft_limit; + uint64_t hard_limit; - max_result_size() = default; + max_result_size() = delete; explicit max_result_size(uint64_t max_size) : soft_limit(max_size), hard_limit(max_size) { } explicit max_result_size(uint64_t soft_limit, uint64_t hard_limit) : soft_limit(soft_limit), hard_limit(hard_limit) { } }; diff --git a/test/perf/perf_row_cache_reads.cc b/test/perf/perf_row_cache_reads.cc index 99b3670022..03899d61d4 100644 --- a/test/perf/perf_row_cache_reads.cc +++ b/test/perf/perf_row_cache_reads.cc @@ -211,7 +211,7 @@ void test_scan_with_range_delete_over_rows() { std::numeric_limits::max(), std::numeric_limits::max(), gc_clock::now(), - query::max_result_size()).get(); + query::max_result_size(query::result_memory_limiter::unlimited_result_size)).get(); }); slm.stop(); From fbb83dd5cafa4142cd3b756e43e827e0a37a0c65 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 13 Sep 2021 16:38:07 +0200 Subject: [PATCH 2/4] reader_concurrency_semaphore: remove default parameter values from constructors It's easy to forget about supplying the correct value for a parameter when it has a default value specified. It's safer if 'production code' is forced to always supply these parameters manually. The default values were mostly useful in tests, where some parameters didn't matter that much and where the majority of uses of the class are. Without default values adding a new parameter is a pain, forcing one to modify every usage in the tests - and there are a bunch of them. To solve this, we introduce a new constructor which requires passing the `for_tests` tag, marking that the constructor is only supposed to be used in tests (and the constructor has an appropriate comment). This constructor uses default values, but the other constructors - used in 'production code' - do not. --- database.cc | 6 ++++-- reader_concurrency_semaphore.cc | 3 ++- reader_concurrency_semaphore.hh | 13 ++++++++++++- test/boost/mutation_fragment_test.cc | 2 +- test/boost/mutation_reader_test.cc | 2 +- test/boost/querier_cache_test.cc | 2 +- test/boost/reader_concurrency_semaphore_test.cc | 16 ++++++++-------- test/boost/view_build_test.cc | 2 +- test/lib/reader_lifecycle_policy.hh | 2 +- 9 files changed, 31 insertions(+), 17 deletions(-) diff --git a/database.cc b/database.cc index 4ec45c4ad7..222fd376d6 100644 --- a/database.cc +++ b/database.cc @@ -340,14 +340,16 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat , _streaming_concurrency_sem( max_count_streaming_concurrent_reads, max_memory_streaming_concurrent_reads(), - "_streaming_concurrency_sem") + "_streaming_concurrency_sem", + std::numeric_limits::max()) // No limits, just for accounting. , _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction") , _system_read_concurrency_sem( // Using higher initial concurrency, see revert_initial_system_read_concurrency_boost(). max_count_concurrent_reads, max_memory_system_concurrent_reads(), - "_system_read_concurrency_sem") + "_system_read_concurrency_sem", + std::numeric_limits::max()) , _row_cache_tracker(cache_tracker::register_metrics::yes) , _apply_stage("db_apply", &database::do_apply) , _version(empty_version) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 48a3cc7ca4..17200866ca 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -625,7 +625,8 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na : reader_concurrency_semaphore( std::numeric_limits::max(), std::numeric_limits::max(), - std::move(name)) {} + std::move(name), + std::numeric_limits::max()) {} reader_concurrency_semaphore::~reader_concurrency_semaphore() { if (!_stats.total_permits) { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 455ebd551e..de8af04a5d 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -239,7 +239,7 @@ public: reader_concurrency_semaphore(int count, ssize_t memory, sstring name, - size_t max_queue_length = std::numeric_limits::max()); + size_t max_queue_length); /// Create a semaphore with practically unlimited count and memory. /// @@ -247,6 +247,17 @@ public: /// The semaphore's name has to be unique! explicit reader_concurrency_semaphore(no_limits, sstring name); + /// A helper constructor *only for tests* that supplies default arguments. + /// The other constructors have default values removed so 'production-code' + /// is forced to specify all of them manually to avoid bugs. + struct for_tests{}; + reader_concurrency_semaphore(for_tests, sstring name, + int count = std::numeric_limits::max(), + ssize_t memory = std::numeric_limits::max(), + size_t max_queue_length = std::numeric_limits::max()) + : reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length) + {} + ~reader_concurrency_semaphore(); reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete; diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc index ac8297757b..e1e39797cd 100644 --- a/test/boost/mutation_fragment_test.cc +++ b/test/boost/mutation_fragment_test.cc @@ -425,7 +425,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) { simple_schema s; - reader_concurrency_semaphore sem(1, 100, get_name()); + reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index c0b725dc71..311ea94223 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -3419,7 +3419,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) } SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) { - reader_concurrency_semaphore semaphore(1, 0, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0); auto stop_sem = deferred_stop(semaphore); simple_schema s; auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index f18d47ff17..d786280ada 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -163,7 +163,7 @@ public: }; test_querier_cache(const noncopyable_function& external_make_value, std::chrono::seconds entry_ttl = 24h, ssize_t max_memory = std::numeric_limits::max()) - : _sem(std::numeric_limits::max(), max_memory, "test_querier_cache") + : _sem(reader_concurrency_semaphore::for_tests{}, "test_querier_cache", std::numeric_limits::max(), max_memory) , _cache(entry_ttl) , _mutations(make_mutations(_s, external_make_value)) , _mutation_source([this] (schema_ptr, reader_permit permit, const dht::partition_range& range) { diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 5112ef8ab9..c0740b98b0 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -63,7 +63,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_releases_units) { simple_schema s; const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024}; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); // Not admitted, active @@ -123,7 +123,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves simple_schema s; const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024}; const auto base_resources = reader_concurrency_semaphore::resources{1, 1024}; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); @@ -279,7 +279,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { #endif simple_schema s; - reader_concurrency_semaphore semaphore(count, count * 1024, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), count, count * 1024); auto stop_sem = deferred_stop(semaphore); std::vector> readers; @@ -396,7 +396,7 @@ class dummy_file_impl : public file_impl { SEASTAR_TEST_CASE(reader_restriction_file_tracking) { return async([&] { - reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 4 * 1024); auto stop_sem = deferred_stop(semaphore); auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get(); @@ -452,7 +452,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) { SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) { return async([&] () { - reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 2, new_reader_base_cost); auto stop_sem = deferred_stop(semaphore); { @@ -496,7 +496,7 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) { SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) { return async([&] () { - reader_concurrency_semaphore semaphore(1, new_reader_base_cost, get_name(), 2); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost, 2); auto stop_sem = deferred_stop(semaphore); { @@ -599,7 +599,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { simple_schema s; const auto schema_ptr = s.schema().get(); const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); auto require_can_admit = [&] (bool expected_can_admit, const char* description, @@ -841,7 +841,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) { const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 0); diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 74f8f940a4..4998b80744 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -796,7 +796,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { } }; - reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name()); + reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost); auto stop_sem = deferred_stop(sem); auto schema = schema_builder("ks", "cf") diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 56852d7b73..1ac0f95cb4 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -96,7 +96,7 @@ public: auto name = format("tests::reader_lifecycle_policy@{}@shard_id={}", fmt::ptr(this), shard); if (_evict_paused_readers) { // Create with no memory, so all inactive reads are immediately evicted. - _contexts[shard]->semaphore.emplace(1, 0, std::move(name)); + _contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::for_tests{}, std::move(name), 1, 0); } else { _contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}, std::move(name)); } From e8824986dd6553468324086a66e40514a65cad45 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 13 Sep 2021 17:34:59 +0200 Subject: [PATCH 3/4] reader_permit: make query max result size accessible from the permit This will make it easier, for example, to enforce memory limits in lower levels of the flat_mutation_reader stack. By default the size is unlimited. However, for specific queries it is possible to store a different value (for example, obtained from a `read_command` object) through a setter. --- reader_concurrency_semaphore.cc | 17 +++++++++++++++++ reader_permit.hh | 4 ++++ 2 files changed, 21 insertions(+) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 17200866ca..50ae2dd0af 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -93,6 +93,7 @@ class reader_permit::impl uint64_t _blocked_branches = 0; bool _marked_as_blocked = false; db::timeout_clock::time_point _timeout; + query::max_result_size _max_result_size{query::result_memory_limiter::unlimited_result_size}; private: void on_permit_used() { @@ -320,6 +321,14 @@ public: } _timeout = timeout; } + + query::max_result_size max_result_size() const { + return _max_result_size; + } + + void set_max_result_size(query::max_result_size s) { + _max_result_size = std::move(s); + } }; static_assert(std::is_nothrow_copy_constructible_v); @@ -412,6 +421,14 @@ void reader_permit::set_timeout(db::timeout_clock::time_point timeout) noexcept _impl->set_timeout(timeout); } +query::max_result_size reader_permit::max_result_size() const { + return _impl->max_result_size(); +} + +void reader_permit::set_max_result_size(query::max_result_size s) { + _impl->set_max_result_size(std::move(s)); +} + std::ostream& operator<<(std::ostream& os, reader_permit::state s) { switch (s) { case reader_permit::state::waiting: diff --git a/reader_permit.hh b/reader_permit.hh index dc9d4a094e..8a0c85e659 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -26,6 +26,7 @@ #include "db/timeout_clock.hh" #include "schema_fwd.hh" +#include "query_class_config.hh" namespace seastar { class file; @@ -165,6 +166,9 @@ public: db::timeout_clock::time_point timeout() const noexcept; void set_timeout(db::timeout_clock::time_point timeout) noexcept; + + query::max_result_size max_result_size() const; + void set_max_result_size(query::max_result_size); }; using reader_permit_opt = optimized_optional; From c12e265eb8ca30b1291965d9c9c9b81e64c9e899 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Aug 2021 15:57:55 +0200 Subject: [PATCH 4/4] table, database: query, mutation_query: remove unnecessary class_config param The semaphore inside was never accessed and `max_memory_for_unlimited_query` was always equal to `*cmd.max_result_size` so the parameter was completely redundant. `cmd.max_result_size` is supposed to be always set in the affected functions - which are executed on the replica side - as soon as the replica receives the `read_command` object, in case the parameter was not set by the coordinator. However, we don't have a guarantee at the type level (it's still an `optional`). Many places used `*cmd.max_result_size` without even an assertion. We make the code a bit safer, we check for `cmd.max_result_size` and if it's indeed engaged, store it in `reader_permit`. We then access it from `reader_permit` where necessary. If `cmd.max_result_size` is not set, we assume this is an unlimited query and obtain the limit from `get_unlimited_query_max_result_size`. --- database.cc | 14 ++++++++------ database.hh | 2 -- query_class_config.hh | 7 ------- table.cc | 10 ++++------ 4 files changed, 12 insertions(+), 21 deletions(-) diff --git a/database.cc b/database.cc index 222fd376d6..5566e0674a 100644 --- a/database.cc +++ b/database.cc @@ -1385,7 +1385,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); - auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; + auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size(); std::optional querier_opt; lw_shared_ptr result; @@ -1397,7 +1397,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti auto read_func = [&, this] (reader_permit permit) { reader_permit::used_guard ug{permit}; - return cf.query(std::move(s), std::move(permit), cmd, class_config, opts, ranges, trace_state, get_result_memory_limiter(), + permit.set_max_result_size(max_result_size); + return cf.query(std::move(s), std::move(permit), cmd, opts, ranges, trace_state, get_result_memory_limiter(), timeout, &querier_opt).then([&result, ug = std::move(ug)] (lw_shared_ptr res) { result = std::move(res); }); @@ -1442,10 +1443,10 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh } const auto short_read_allwoed = query::short_read(cmd.slice.options.contains()); - auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed); - column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); - auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; + auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size(); + auto accounter = co_await get_result_memory_limiter().new_mutation_read(max_result_size, short_read_allwoed); + column_family& cf = find_column_family(cmd.cf_id); std::optional querier_opt; reconcilable_result result; @@ -1457,7 +1458,8 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh auto read_func = [&, this] (reader_permit permit) { reader_permit::used_guard ug{permit}; - return cf.mutation_query(std::move(s), std::move(permit), cmd, class_config, range, + permit.set_max_result_size(max_result_size); + return cf.mutation_query(std::move(s), std::move(permit), cmd, range, std::move(trace_state), std::move(accounter), timeout, &querier_opt).then([&result, ug = std::move(ug)] (reconcilable_result res) { result = std::move(res); }); diff --git a/database.hh b/database.hh index 829958200b..45015fd28f 100644 --- a/database.hh +++ b/database.hh @@ -751,7 +751,6 @@ public: query(schema_ptr, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, @@ -779,7 +778,6 @@ public: mutation_query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, const dht::partition_range& range, tracing::trace_state_ptr trace_state, query::result_memory_accounter accounter, diff --git a/query_class_config.hh b/query_class_config.hh index 8d88b03756..e0f245d298 100644 --- a/query_class_config.hh +++ b/query_class_config.hh @@ -23,8 +23,6 @@ #include -class reader_concurrency_semaphore; - namespace query { struct max_result_size { @@ -40,9 +38,4 @@ inline bool operator==(const max_result_size& a, const max_result_size& b) { return a.soft_limit == b.soft_limit && a.hard_limit == b.hard_limit; } -struct query_class_config { - reader_concurrency_semaphore& semaphore; - max_result_size max_memory_for_unlimited_query; -}; - } diff --git a/table.cc b/table.cc index b3388b0634..6fee878607 100644 --- a/table.cc +++ b/table.cc @@ -1956,7 +1956,6 @@ future> table::query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, query::result_options opts, const dht::partition_range_vector& partition_ranges, tracing::trace_state_ptr trace_state, @@ -1981,7 +1980,8 @@ table::query(schema_ptr s, const auto short_read_allowed = query::short_read(cmd.slice.options.contains()); auto accounter = co_await (opts.request == query::result_request::only_digest - ? memory_limiter.new_digest_read(*cmd.max_result_size, short_read_allowed) : memory_limiter.new_data_read(*cmd.max_result_size, short_read_allowed)); + ? memory_limiter.new_digest_read(permit.max_result_size(), short_read_allowed) + : memory_limiter.new_data_read(permit.max_result_size(), short_read_allowed)); query_state qs(s, cmd, opts, partition_ranges, std::move(accounter)); @@ -2001,8 +2001,7 @@ table::query(schema_ptr s, std::exception_ptr ex; try { - co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, - class_config.max_memory_for_unlimited_query); + co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, permit.max_result_size()); } catch (...) { ex = std::current_exception(); } @@ -2030,7 +2029,6 @@ future table::mutation_query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, const dht::partition_range& range, tracing::trace_state_ptr trace_state, query::result_memory_accounter accounter, @@ -2056,7 +2054,7 @@ table::mutation_query(schema_ptr s, // legacy format. auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s; auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter)); - auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query); + auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, permit.max_result_size()); if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) { co_await q.close();