diff --git a/database.cc b/database.cc index 4ec45c4ad7..5566e0674a 100644 --- a/database.cc +++ b/database.cc @@ -340,14 +340,16 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat , _streaming_concurrency_sem( max_count_streaming_concurrent_reads, max_memory_streaming_concurrent_reads(), - "_streaming_concurrency_sem") + "_streaming_concurrency_sem", + std::numeric_limits::max()) // No limits, just for accounting. , _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction") , _system_read_concurrency_sem( // Using higher initial concurrency, see revert_initial_system_read_concurrency_boost(). max_count_concurrent_reads, max_memory_system_concurrent_reads(), - "_system_read_concurrency_sem") + "_system_read_concurrency_sem", + std::numeric_limits::max()) , _row_cache_tracker(cache_tracker::register_metrics::yes) , _apply_stage("db_apply", &database::do_apply) , _version(empty_version) @@ -1383,7 +1385,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); - auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; + auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size(); std::optional querier_opt; lw_shared_ptr result; @@ -1395,7 +1397,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti auto read_func = [&, this] (reader_permit permit) { reader_permit::used_guard ug{permit}; - return cf.query(std::move(s), std::move(permit), cmd, class_config, opts, ranges, trace_state, get_result_memory_limiter(), + permit.set_max_result_size(max_result_size); + return cf.query(std::move(s), std::move(permit), cmd, opts, ranges, trace_state, get_result_memory_limiter(), timeout, &querier_opt).then([&result, ug = std::move(ug)] (lw_shared_ptr res) { result = std::move(res); }); @@ -1440,10 +1443,10 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh } const auto short_read_allwoed = query::short_read(cmd.slice.options.contains()); - auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed); - column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); - auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; + auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size(); + auto accounter = co_await get_result_memory_limiter().new_mutation_read(max_result_size, short_read_allwoed); + column_family& cf = find_column_family(cmd.cf_id); std::optional querier_opt; reconcilable_result result; @@ -1455,7 +1458,8 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh auto read_func = [&, this] (reader_permit permit) { reader_permit::used_guard ug{permit}; - return cf.mutation_query(std::move(s), std::move(permit), cmd, class_config, range, + permit.set_max_result_size(max_result_size); + return cf.mutation_query(std::move(s), std::move(permit), cmd, range, std::move(trace_state), std::move(accounter), timeout, &querier_opt).then([&result, ug = std::move(ug)] (reconcilable_result res) { result = std::move(res); }); diff --git a/database.hh b/database.hh index 8ce9349d09..898df400ff 100644 --- a/database.hh +++ b/database.hh @@ -755,7 +755,6 @@ public: query(schema_ptr, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, @@ -783,7 +782,6 @@ public: mutation_query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, const dht::partition_range& range, tracing::trace_state_ptr trace_state, query::result_memory_accounter accounter, diff --git a/query_class_config.hh b/query_class_config.hh index a8c02ec18e..e0f245d298 100644 --- a/query_class_config.hh +++ b/query_class_config.hh @@ -23,15 +23,13 @@ #include -class reader_concurrency_semaphore; - namespace query { struct max_result_size { - uint64_t soft_limit = 0; - uint64_t hard_limit = 0; + uint64_t soft_limit; + uint64_t hard_limit; - max_result_size() = default; + max_result_size() = delete; explicit max_result_size(uint64_t max_size) : soft_limit(max_size), hard_limit(max_size) { } explicit max_result_size(uint64_t soft_limit, uint64_t hard_limit) : soft_limit(soft_limit), hard_limit(hard_limit) { } }; @@ -40,9 +38,4 @@ inline bool operator==(const max_result_size& a, const max_result_size& b) { return a.soft_limit == b.soft_limit && a.hard_limit == b.hard_limit; } -struct query_class_config { - reader_concurrency_semaphore& semaphore; - max_result_size max_memory_for_unlimited_query; -}; - } diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 48a3cc7ca4..50ae2dd0af 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -93,6 +93,7 @@ class reader_permit::impl uint64_t _blocked_branches = 0; bool _marked_as_blocked = false; db::timeout_clock::time_point _timeout; + query::max_result_size _max_result_size{query::result_memory_limiter::unlimited_result_size}; private: void on_permit_used() { @@ -320,6 +321,14 @@ public: } _timeout = timeout; } + + query::max_result_size max_result_size() const { + return _max_result_size; + } + + void set_max_result_size(query::max_result_size s) { + _max_result_size = std::move(s); + } }; static_assert(std::is_nothrow_copy_constructible_v); @@ -412,6 +421,14 @@ void reader_permit::set_timeout(db::timeout_clock::time_point timeout) noexcept _impl->set_timeout(timeout); } +query::max_result_size reader_permit::max_result_size() const { + return _impl->max_result_size(); +} + +void reader_permit::set_max_result_size(query::max_result_size s) { + _impl->set_max_result_size(std::move(s)); +} + std::ostream& operator<<(std::ostream& os, reader_permit::state s) { switch (s) { case reader_permit::state::waiting: @@ -625,7 +642,8 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na : reader_concurrency_semaphore( std::numeric_limits::max(), std::numeric_limits::max(), - std::move(name)) {} + std::move(name), + std::numeric_limits::max()) {} reader_concurrency_semaphore::~reader_concurrency_semaphore() { if (!_stats.total_permits) { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 455ebd551e..de8af04a5d 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -239,7 +239,7 @@ public: reader_concurrency_semaphore(int count, ssize_t memory, sstring name, - size_t max_queue_length = std::numeric_limits::max()); + size_t max_queue_length); /// Create a semaphore with practically unlimited count and memory. /// @@ -247,6 +247,17 @@ public: /// The semaphore's name has to be unique! explicit reader_concurrency_semaphore(no_limits, sstring name); + /// A helper constructor *only for tests* that supplies default arguments. + /// The other constructors have default values removed so 'production-code' + /// is forced to specify all of them manually to avoid bugs. + struct for_tests{}; + reader_concurrency_semaphore(for_tests, sstring name, + int count = std::numeric_limits::max(), + ssize_t memory = std::numeric_limits::max(), + size_t max_queue_length = std::numeric_limits::max()) + : reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length) + {} + ~reader_concurrency_semaphore(); reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete; diff --git a/reader_permit.hh b/reader_permit.hh index dc9d4a094e..8a0c85e659 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -26,6 +26,7 @@ #include "db/timeout_clock.hh" #include "schema_fwd.hh" +#include "query_class_config.hh" namespace seastar { class file; @@ -165,6 +166,9 @@ public: db::timeout_clock::time_point timeout() const noexcept; void set_timeout(db::timeout_clock::time_point timeout) noexcept; + + query::max_result_size max_result_size() const; + void set_max_result_size(query::max_result_size); }; using reader_permit_opt = optimized_optional; diff --git a/table.cc b/table.cc index b3388b0634..6fee878607 100644 --- a/table.cc +++ b/table.cc @@ -1956,7 +1956,6 @@ future> table::query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, query::result_options opts, const dht::partition_range_vector& partition_ranges, tracing::trace_state_ptr trace_state, @@ -1981,7 +1980,8 @@ table::query(schema_ptr s, const auto short_read_allowed = query::short_read(cmd.slice.options.contains()); auto accounter = co_await (opts.request == query::result_request::only_digest - ? memory_limiter.new_digest_read(*cmd.max_result_size, short_read_allowed) : memory_limiter.new_data_read(*cmd.max_result_size, short_read_allowed)); + ? memory_limiter.new_digest_read(permit.max_result_size(), short_read_allowed) + : memory_limiter.new_data_read(permit.max_result_size(), short_read_allowed)); query_state qs(s, cmd, opts, partition_ranges, std::move(accounter)); @@ -2001,8 +2001,7 @@ table::query(schema_ptr s, std::exception_ptr ex; try { - co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, - class_config.max_memory_for_unlimited_query); + co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, permit.max_result_size()); } catch (...) { ex = std::current_exception(); } @@ -2030,7 +2029,6 @@ future table::mutation_query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, const dht::partition_range& range, tracing::trace_state_ptr trace_state, query::result_memory_accounter accounter, @@ -2056,7 +2054,7 @@ table::mutation_query(schema_ptr s, // legacy format. auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s; auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter)); - auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query); + auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, permit.max_result_size()); if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) { co_await q.close(); diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc index ac8297757b..e1e39797cd 100644 --- a/test/boost/mutation_fragment_test.cc +++ b/test/boost/mutation_fragment_test.cc @@ -425,7 +425,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) { simple_schema s; - reader_concurrency_semaphore sem(1, 100, get_name()); + reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index c0b725dc71..311ea94223 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -3419,7 +3419,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) } SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) { - reader_concurrency_semaphore semaphore(1, 0, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0); auto stop_sem = deferred_stop(semaphore); simple_schema s; auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index f18d47ff17..d786280ada 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -163,7 +163,7 @@ public: }; test_querier_cache(const noncopyable_function& external_make_value, std::chrono::seconds entry_ttl = 24h, ssize_t max_memory = std::numeric_limits::max()) - : _sem(std::numeric_limits::max(), max_memory, "test_querier_cache") + : _sem(reader_concurrency_semaphore::for_tests{}, "test_querier_cache", std::numeric_limits::max(), max_memory) , _cache(entry_ttl) , _mutations(make_mutations(_s, external_make_value)) , _mutation_source([this] (schema_ptr, reader_permit permit, const dht::partition_range& range) { diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 5112ef8ab9..c0740b98b0 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -63,7 +63,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_releases_units) { simple_schema s; const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024}; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); // Not admitted, active @@ -123,7 +123,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves simple_schema s; const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024}; const auto base_resources = reader_concurrency_semaphore::resources{1, 1024}; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); @@ -279,7 +279,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { #endif simple_schema s; - reader_concurrency_semaphore semaphore(count, count * 1024, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), count, count * 1024); auto stop_sem = deferred_stop(semaphore); std::vector> readers; @@ -396,7 +396,7 @@ class dummy_file_impl : public file_impl { SEASTAR_TEST_CASE(reader_restriction_file_tracking) { return async([&] { - reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 4 * 1024); auto stop_sem = deferred_stop(semaphore); auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get(); @@ -452,7 +452,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) { SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) { return async([&] () { - reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 2, new_reader_base_cost); auto stop_sem = deferred_stop(semaphore); { @@ -496,7 +496,7 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) { SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) { return async([&] () { - reader_concurrency_semaphore semaphore(1, new_reader_base_cost, get_name(), 2); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost, 2); auto stop_sem = deferred_stop(semaphore); { @@ -599,7 +599,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { simple_schema s; const auto schema_ptr = s.schema().get(); const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); auto require_can_admit = [&] (bool expected_can_admit, const char* description, @@ -841,7 +841,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) { const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 0); diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 74f8f940a4..4998b80744 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -796,7 +796,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { } }; - reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name()); + reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost); auto stop_sem = deferred_stop(sem); auto schema = schema_builder("ks", "cf") diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 56852d7b73..1ac0f95cb4 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -96,7 +96,7 @@ public: auto name = format("tests::reader_lifecycle_policy@{}@shard_id={}", fmt::ptr(this), shard); if (_evict_paused_readers) { // Create with no memory, so all inactive reads are immediately evicted. - _contexts[shard]->semaphore.emplace(1, 0, std::move(name)); + _contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::for_tests{}, std::move(name), 1, 0); } else { _contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}, std::move(name)); } diff --git a/test/perf/perf_row_cache_reads.cc b/test/perf/perf_row_cache_reads.cc index 99b3670022..03899d61d4 100644 --- a/test/perf/perf_row_cache_reads.cc +++ b/test/perf/perf_row_cache_reads.cc @@ -211,7 +211,7 @@ void test_scan_with_range_delete_over_rows() { std::numeric_limits::max(), std::numeric_limits::max(), gc_clock::now(), - query::max_result_size()).get(); + query::max_result_size(query::result_memory_limiter::unlimited_result_size)).get(); }); slm.stop();