From c12e265eb8ca30b1291965d9c9c9b81e64c9e899 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Aug 2021 15:57:55 +0200 Subject: [PATCH] table, database: query, mutation_query: remove unnecessary class_config param The semaphore inside was never accessed and `max_memory_for_unlimited_query` was always equal to `*cmd.max_result_size` so the parameter was completely redundant. `cmd.max_result_size` is supposed to be always set in the affected functions - which are executed on the replica side - as soon as the replica receives the `read_command` object, in case the parameter was not set by the coordinator. However, we don't have a guarantee at the type level (it's still an `optional`). Many places used `*cmd.max_result_size` without even an assertion. We make the code a bit safer, we check for `cmd.max_result_size` and if it's indeed engaged, store it in `reader_permit`. We then access it from `reader_permit` where necessary. If `cmd.max_result_size` is not set, we assume this is an unlimited query and obtain the limit from `get_unlimited_query_max_result_size`. --- database.cc | 14 ++++++++------ database.hh | 2 -- query_class_config.hh | 7 ------- table.cc | 10 ++++------ 4 files changed, 12 insertions(+), 21 deletions(-) diff --git a/database.cc b/database.cc index 222fd376d6..5566e0674a 100644 --- a/database.cc +++ b/database.cc @@ -1385,7 +1385,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); - auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; + auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size(); std::optional querier_opt; lw_shared_ptr result; @@ -1397,7 +1397,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti auto read_func = [&, this] (reader_permit permit) { reader_permit::used_guard ug{permit}; - return cf.query(std::move(s), std::move(permit), cmd, class_config, opts, ranges, trace_state, get_result_memory_limiter(), + permit.set_max_result_size(max_result_size); + return cf.query(std::move(s), std::move(permit), cmd, opts, ranges, trace_state, get_result_memory_limiter(), timeout, &querier_opt).then([&result, ug = std::move(ug)] (lw_shared_ptr res) { result = std::move(res); }); @@ -1442,10 +1443,10 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh } const auto short_read_allwoed = query::short_read(cmd.slice.options.contains()); - auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed); - column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); - auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; + auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size(); + auto accounter = co_await get_result_memory_limiter().new_mutation_read(max_result_size, short_read_allwoed); + column_family& cf = find_column_family(cmd.cf_id); std::optional querier_opt; reconcilable_result result; @@ -1457,7 +1458,8 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh auto read_func = [&, this] (reader_permit permit) { reader_permit::used_guard ug{permit}; - return cf.mutation_query(std::move(s), std::move(permit), cmd, class_config, range, + permit.set_max_result_size(max_result_size); + return cf.mutation_query(std::move(s), std::move(permit), cmd, range, std::move(trace_state), std::move(accounter), timeout, &querier_opt).then([&result, ug = std::move(ug)] (reconcilable_result res) { result = std::move(res); }); diff --git a/database.hh b/database.hh index 829958200b..45015fd28f 100644 --- a/database.hh +++ b/database.hh @@ -751,7 +751,6 @@ public: query(schema_ptr, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, @@ -779,7 +778,6 @@ public: mutation_query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, const dht::partition_range& range, tracing::trace_state_ptr trace_state, query::result_memory_accounter accounter, diff --git a/query_class_config.hh b/query_class_config.hh index 8d88b03756..e0f245d298 100644 --- a/query_class_config.hh +++ b/query_class_config.hh @@ -23,8 +23,6 @@ #include -class reader_concurrency_semaphore; - namespace query { struct max_result_size { @@ -40,9 +38,4 @@ inline bool operator==(const max_result_size& a, const max_result_size& b) { return a.soft_limit == b.soft_limit && a.hard_limit == b.hard_limit; } -struct query_class_config { - reader_concurrency_semaphore& semaphore; - max_result_size max_memory_for_unlimited_query; -}; - } diff --git a/table.cc b/table.cc index b3388b0634..6fee878607 100644 --- a/table.cc +++ b/table.cc @@ -1956,7 +1956,6 @@ future> table::query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, query::result_options opts, const dht::partition_range_vector& partition_ranges, tracing::trace_state_ptr trace_state, @@ -1981,7 +1980,8 @@ table::query(schema_ptr s, const auto short_read_allowed = query::short_read(cmd.slice.options.contains()); auto accounter = co_await (opts.request == query::result_request::only_digest - ? memory_limiter.new_digest_read(*cmd.max_result_size, short_read_allowed) : memory_limiter.new_data_read(*cmd.max_result_size, short_read_allowed)); + ? memory_limiter.new_digest_read(permit.max_result_size(), short_read_allowed) + : memory_limiter.new_data_read(permit.max_result_size(), short_read_allowed)); query_state qs(s, cmd, opts, partition_ranges, std::move(accounter)); @@ -2001,8 +2001,7 @@ table::query(schema_ptr s, std::exception_ptr ex; try { - co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, - class_config.max_memory_for_unlimited_query); + co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, permit.max_result_size()); } catch (...) { ex = std::current_exception(); } @@ -2030,7 +2029,6 @@ future table::mutation_query(schema_ptr s, reader_permit permit, const query::read_command& cmd, - query::query_class_config class_config, const dht::partition_range& range, tracing::trace_state_ptr trace_state, query::result_memory_accounter accounter, @@ -2056,7 +2054,7 @@ table::mutation_query(schema_ptr s, // legacy format. auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s; auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter)); - auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query); + auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, permit.max_result_size()); if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) { co_await q.close();