table, database: query, mutation_query: remove unnecessary class_config param

The semaphore inside was never accessed and `max_memory_for_unlimited_query`
was always equal to `*cmd.max_result_size` so the parameter was completely
redundant.

`cmd.max_result_size` is supposed to be always set in the affected
functions - which are executed on the replica side - as soon as the
replica receives the `read_command` object, in case the parameter was
not set by the coordinator. However, we don't have a guarantee at the
type level (it's still an `optional`). Many places used
`*cmd.max_result_size` without even an assertion.

We make the code a bit safer, we check for `cmd.max_result_size` and if
it's indeed engaged, store it in `reader_permit`. We then access it from
`reader_permit` where necessary. If `cmd.max_result_size` is not set, we
assume this is an unlimited query and obtain the limit from
`get_unlimited_query_max_result_size`.
This commit is contained in:
Kamil Braun
2021-08-23 15:57:55 +02:00
parent e8824986dd
commit c12e265eb8
4 changed files with 12 additions and 21 deletions

View File

@@ -1385,7 +1385,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
column_family& cf = find_column_family(cmd.cf_id);
auto& semaphore = get_reader_concurrency_semaphore();
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size();
std::optional<query::data_querier> querier_opt;
lw_shared_ptr<query::result> result;
@@ -1397,7 +1397,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
auto read_func = [&, this] (reader_permit permit) {
reader_permit::used_guard ug{permit};
return cf.query(std::move(s), std::move(permit), cmd, class_config, opts, ranges, trace_state, get_result_memory_limiter(),
permit.set_max_result_size(max_result_size);
return cf.query(std::move(s), std::move(permit), cmd, opts, ranges, trace_state, get_result_memory_limiter(),
timeout, &querier_opt).then([&result, ug = std::move(ug)] (lw_shared_ptr<query::result> res) {
result = std::move(res);
});
@@ -1442,10 +1443,10 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
}
const auto short_read_allwoed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed);
column_family& cf = find_column_family(cmd.cf_id);
auto& semaphore = get_reader_concurrency_semaphore();
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size();
auto accounter = co_await get_result_memory_limiter().new_mutation_read(max_result_size, short_read_allwoed);
column_family& cf = find_column_family(cmd.cf_id);
std::optional<query::mutation_querier> querier_opt;
reconcilable_result result;
@@ -1457,7 +1458,8 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
auto read_func = [&, this] (reader_permit permit) {
reader_permit::used_guard ug{permit};
return cf.mutation_query(std::move(s), std::move(permit), cmd, class_config, range,
permit.set_max_result_size(max_result_size);
return cf.mutation_query(std::move(s), std::move(permit), cmd, range,
std::move(trace_state), std::move(accounter), timeout, &querier_opt).then([&result, ug = std::move(ug)] (reconcilable_result res) {
result = std::move(res);
});

View File

@@ -751,7 +751,6 @@ public:
query(schema_ptr,
reader_permit permit,
const query::read_command& cmd,
query::query_class_config class_config,
query::result_options opts,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
@@ -779,7 +778,6 @@ public:
mutation_query(schema_ptr s,
reader_permit permit,
const query::read_command& cmd,
query::query_class_config class_config,
const dht::partition_range& range,
tracing::trace_state_ptr trace_state,
query::result_memory_accounter accounter,

View File

@@ -23,8 +23,6 @@
#include <cinttypes>
class reader_concurrency_semaphore;
namespace query {
struct max_result_size {
@@ -40,9 +38,4 @@ inline bool operator==(const max_result_size& a, const max_result_size& b) {
return a.soft_limit == b.soft_limit && a.hard_limit == b.hard_limit;
}
struct query_class_config {
reader_concurrency_semaphore& semaphore;
max_result_size max_memory_for_unlimited_query;
};
}

View File

@@ -1956,7 +1956,6 @@ future<lw_shared_ptr<query::result>>
table::query(schema_ptr s,
reader_permit permit,
const query::read_command& cmd,
query::query_class_config class_config,
query::result_options opts,
const dht::partition_range_vector& partition_ranges,
tracing::trace_state_ptr trace_state,
@@ -1981,7 +1980,8 @@ table::query(schema_ptr s,
const auto short_read_allowed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
auto accounter = co_await (opts.request == query::result_request::only_digest
? memory_limiter.new_digest_read(*cmd.max_result_size, short_read_allowed) : memory_limiter.new_data_read(*cmd.max_result_size, short_read_allowed));
? memory_limiter.new_digest_read(permit.max_result_size(), short_read_allowed)
: memory_limiter.new_data_read(permit.max_result_size(), short_read_allowed));
query_state qs(s, cmd, opts, partition_ranges, std::move(accounter));
@@ -2001,8 +2001,7 @@ table::query(schema_ptr s,
std::exception_ptr ex;
try {
co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp,
class_config.max_memory_for_unlimited_query);
co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, permit.max_result_size());
} catch (...) {
ex = std::current_exception();
}
@@ -2030,7 +2029,6 @@ future<reconcilable_result>
table::mutation_query(schema_ptr s,
reader_permit permit,
const query::read_command& cmd,
query::query_class_config class_config,
const dht::partition_range& range,
tracing::trace_state_ptr trace_state,
query::result_memory_accounter accounter,
@@ -2056,7 +2054,7 @@ table::mutation_query(schema_ptr s,
// legacy format.
auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s;
auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter));
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query);
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, permit.max_result_size());
if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) {
co_await q.close();