mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge 'reader_permit: make query max result size accessible from the permit' from Kamil Braun
This will make it easier, for example, to enforce memory limits in lower levels of the `flat_mutation_reader` stack. By default, the query result size is unlimited. However, for specific queries it is possible to store a different value (e.g. obtained from a `read_command` object) through a setter. An example of this can be seen in the last commit of this PR, where we set the limit to `cmd.max_result_size` if engaged, or to the 'unlimited query' limit (using `database::get_unlimited_query_max_result_size()`) if not. Refs: #9281. The v2 version of the reverse sstable reader PR will be based on this PR: we'll use the query max result size parameter in one of the readers down the stack where `read_command` is not available but `reader_permit` is. Closes #9341 * github.com:scylladb/scylla: table, database: query, mutation_query: remove unnecessary class_config param reader_permit: make query max result size accessible from the permit reader_concurrency_semaphore: remove default parameter values from constructors query_class_config: remove query::max_result_size default constructor
This commit is contained in:
20
database.cc
20
database.cc
@@ -340,14 +340,16 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _streaming_concurrency_sem(
|
||||
max_count_streaming_concurrent_reads,
|
||||
max_memory_streaming_concurrent_reads(),
|
||||
"_streaming_concurrency_sem")
|
||||
"_streaming_concurrency_sem",
|
||||
std::numeric_limits<size_t>::max())
|
||||
// No limits, just for accounting.
|
||||
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction")
|
||||
, _system_read_concurrency_sem(
|
||||
// Using higher initial concurrency, see revert_initial_system_read_concurrency_boost().
|
||||
max_count_concurrent_reads,
|
||||
max_memory_system_concurrent_reads(),
|
||||
"_system_read_concurrency_sem")
|
||||
"_system_read_concurrency_sem",
|
||||
std::numeric_limits<size_t>::max())
|
||||
, _row_cache_tracker(cache_tracker::register_metrics::yes)
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
, _version(empty_version)
|
||||
@@ -1383,7 +1385,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
|
||||
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
auto& semaphore = get_reader_concurrency_semaphore();
|
||||
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
|
||||
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size();
|
||||
|
||||
std::optional<query::data_querier> querier_opt;
|
||||
lw_shared_ptr<query::result> result;
|
||||
@@ -1395,7 +1397,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
|
||||
|
||||
auto read_func = [&, this] (reader_permit permit) {
|
||||
reader_permit::used_guard ug{permit};
|
||||
return cf.query(std::move(s), std::move(permit), cmd, class_config, opts, ranges, trace_state, get_result_memory_limiter(),
|
||||
permit.set_max_result_size(max_result_size);
|
||||
return cf.query(std::move(s), std::move(permit), cmd, opts, ranges, trace_state, get_result_memory_limiter(),
|
||||
timeout, &querier_opt).then([&result, ug = std::move(ug)] (lw_shared_ptr<query::result> res) {
|
||||
result = std::move(res);
|
||||
});
|
||||
@@ -1440,10 +1443,10 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
}
|
||||
|
||||
const auto short_read_allwoed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
|
||||
auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed);
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
auto& semaphore = get_reader_concurrency_semaphore();
|
||||
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
|
||||
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size();
|
||||
auto accounter = co_await get_result_memory_limiter().new_mutation_read(max_result_size, short_read_allwoed);
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
|
||||
std::optional<query::mutation_querier> querier_opt;
|
||||
reconcilable_result result;
|
||||
@@ -1455,7 +1458,8 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
|
||||
auto read_func = [&, this] (reader_permit permit) {
|
||||
reader_permit::used_guard ug{permit};
|
||||
return cf.mutation_query(std::move(s), std::move(permit), cmd, class_config, range,
|
||||
permit.set_max_result_size(max_result_size);
|
||||
return cf.mutation_query(std::move(s), std::move(permit), cmd, range,
|
||||
std::move(trace_state), std::move(accounter), timeout, &querier_opt).then([&result, ug = std::move(ug)] (reconcilable_result res) {
|
||||
result = std::move(res);
|
||||
});
|
||||
|
||||
@@ -755,7 +755,6 @@ public:
|
||||
query(schema_ptr,
|
||||
reader_permit permit,
|
||||
const query::read_command& cmd,
|
||||
query::query_class_config class_config,
|
||||
query::result_options opts,
|
||||
const dht::partition_range_vector& ranges,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
@@ -783,7 +782,6 @@ public:
|
||||
mutation_query(schema_ptr s,
|
||||
reader_permit permit,
|
||||
const query::read_command& cmd,
|
||||
query::query_class_config class_config,
|
||||
const dht::partition_range& range,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
query::result_memory_accounter accounter,
|
||||
|
||||
@@ -23,15 +23,13 @@
|
||||
|
||||
#include <cinttypes>
|
||||
|
||||
class reader_concurrency_semaphore;
|
||||
|
||||
namespace query {
|
||||
|
||||
struct max_result_size {
|
||||
uint64_t soft_limit = 0;
|
||||
uint64_t hard_limit = 0;
|
||||
uint64_t soft_limit;
|
||||
uint64_t hard_limit;
|
||||
|
||||
max_result_size() = default;
|
||||
max_result_size() = delete;
|
||||
explicit max_result_size(uint64_t max_size) : soft_limit(max_size), hard_limit(max_size) { }
|
||||
explicit max_result_size(uint64_t soft_limit, uint64_t hard_limit) : soft_limit(soft_limit), hard_limit(hard_limit) { }
|
||||
};
|
||||
@@ -40,9 +38,4 @@ inline bool operator==(const max_result_size& a, const max_result_size& b) {
|
||||
return a.soft_limit == b.soft_limit && a.hard_limit == b.hard_limit;
|
||||
}
|
||||
|
||||
struct query_class_config {
|
||||
reader_concurrency_semaphore& semaphore;
|
||||
max_result_size max_memory_for_unlimited_query;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -93,6 +93,7 @@ class reader_permit::impl
|
||||
uint64_t _blocked_branches = 0;
|
||||
bool _marked_as_blocked = false;
|
||||
db::timeout_clock::time_point _timeout;
|
||||
query::max_result_size _max_result_size{query::result_memory_limiter::unlimited_result_size};
|
||||
|
||||
private:
|
||||
void on_permit_used() {
|
||||
@@ -320,6 +321,14 @@ public:
|
||||
}
|
||||
_timeout = timeout;
|
||||
}
|
||||
|
||||
query::max_result_size max_result_size() const {
|
||||
return _max_result_size;
|
||||
}
|
||||
|
||||
void set_max_result_size(query::max_result_size s) {
|
||||
_max_result_size = std::move(s);
|
||||
}
|
||||
};
|
||||
|
||||
static_assert(std::is_nothrow_copy_constructible_v<reader_permit>);
|
||||
@@ -412,6 +421,14 @@ void reader_permit::set_timeout(db::timeout_clock::time_point timeout) noexcept
|
||||
_impl->set_timeout(timeout);
|
||||
}
|
||||
|
||||
query::max_result_size reader_permit::max_result_size() const {
|
||||
return _impl->max_result_size();
|
||||
}
|
||||
|
||||
void reader_permit::set_max_result_size(query::max_result_size s) {
|
||||
_impl->set_max_result_size(std::move(s));
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, reader_permit::state s) {
|
||||
switch (s) {
|
||||
case reader_permit::state::waiting:
|
||||
@@ -625,7 +642,8 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
|
||||
: reader_concurrency_semaphore(
|
||||
std::numeric_limits<int>::max(),
|
||||
std::numeric_limits<ssize_t>::max(),
|
||||
std::move(name)) {}
|
||||
std::move(name),
|
||||
std::numeric_limits<size_t>::max()) {}
|
||||
|
||||
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
if (!_stats.total_permits) {
|
||||
|
||||
@@ -239,7 +239,7 @@ public:
|
||||
reader_concurrency_semaphore(int count,
|
||||
ssize_t memory,
|
||||
sstring name,
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max());
|
||||
size_t max_queue_length);
|
||||
|
||||
/// Create a semaphore with practically unlimited count and memory.
|
||||
///
|
||||
@@ -247,6 +247,17 @@ public:
|
||||
/// The semaphore's name has to be unique!
|
||||
explicit reader_concurrency_semaphore(no_limits, sstring name);
|
||||
|
||||
/// A helper constructor *only for tests* that supplies default arguments.
|
||||
/// The other constructors have default values removed so 'production-code'
|
||||
/// is forced to specify all of them manually to avoid bugs.
|
||||
struct for_tests{};
|
||||
reader_concurrency_semaphore(for_tests, sstring name,
|
||||
int count = std::numeric_limits<int>::max(),
|
||||
ssize_t memory = std::numeric_limits<ssize_t>::max(),
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max())
|
||||
: reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length)
|
||||
{}
|
||||
|
||||
~reader_concurrency_semaphore();
|
||||
|
||||
reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete;
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "schema_fwd.hh"
|
||||
#include "query_class_config.hh"
|
||||
|
||||
namespace seastar {
|
||||
class file;
|
||||
@@ -165,6 +166,9 @@ public:
|
||||
db::timeout_clock::time_point timeout() const noexcept;
|
||||
|
||||
void set_timeout(db::timeout_clock::time_point timeout) noexcept;
|
||||
|
||||
query::max_result_size max_result_size() const;
|
||||
void set_max_result_size(query::max_result_size);
|
||||
};
|
||||
|
||||
using reader_permit_opt = optimized_optional<reader_permit>;
|
||||
|
||||
10
table.cc
10
table.cc
@@ -1956,7 +1956,6 @@ future<lw_shared_ptr<query::result>>
|
||||
table::query(schema_ptr s,
|
||||
reader_permit permit,
|
||||
const query::read_command& cmd,
|
||||
query::query_class_config class_config,
|
||||
query::result_options opts,
|
||||
const dht::partition_range_vector& partition_ranges,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
@@ -1981,7 +1980,8 @@ table::query(schema_ptr s,
|
||||
|
||||
const auto short_read_allowed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
|
||||
auto accounter = co_await (opts.request == query::result_request::only_digest
|
||||
? memory_limiter.new_digest_read(*cmd.max_result_size, short_read_allowed) : memory_limiter.new_data_read(*cmd.max_result_size, short_read_allowed));
|
||||
? memory_limiter.new_digest_read(permit.max_result_size(), short_read_allowed)
|
||||
: memory_limiter.new_data_read(permit.max_result_size(), short_read_allowed));
|
||||
|
||||
query_state qs(s, cmd, opts, partition_ranges, std::move(accounter));
|
||||
|
||||
@@ -2001,8 +2001,7 @@ table::query(schema_ptr s,
|
||||
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp,
|
||||
class_config.max_memory_for_unlimited_query);
|
||||
co_await q.consume_page(query_result_builder(*s, qs.builder), qs.remaining_rows(), qs.remaining_partitions(), qs.cmd.timestamp, permit.max_result_size());
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
@@ -2030,7 +2029,6 @@ future<reconcilable_result>
|
||||
table::mutation_query(schema_ptr s,
|
||||
reader_permit permit,
|
||||
const query::read_command& cmd,
|
||||
query::query_class_config class_config,
|
||||
const dht::partition_range& range,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
query::result_memory_accounter accounter,
|
||||
@@ -2056,7 +2054,7 @@ table::mutation_query(schema_ptr s,
|
||||
// legacy format.
|
||||
auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s;
|
||||
auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter));
|
||||
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query);
|
||||
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, permit.max_result_size());
|
||||
|
||||
if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) {
|
||||
co_await q.close();
|
||||
|
||||
@@ -425,7 +425,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
|
||||
|
||||
simple_schema s;
|
||||
|
||||
reader_concurrency_semaphore sem(1, 100, get_name());
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
|
||||
|
||||
@@ -3419,7 +3419,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
|
||||
reader_concurrency_semaphore semaphore(1, 0, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
|
||||
@@ -163,7 +163,7 @@ public:
|
||||
};
|
||||
|
||||
test_querier_cache(const noncopyable_function<sstring(size_t)>& external_make_value, std::chrono::seconds entry_ttl = 24h, ssize_t max_memory = std::numeric_limits<ssize_t>::max())
|
||||
: _sem(std::numeric_limits<int>::max(), max_memory, "test_querier_cache")
|
||||
: _sem(reader_concurrency_semaphore::for_tests{}, "test_querier_cache", std::numeric_limits<int>::max(), max_memory)
|
||||
, _cache(entry_ttl)
|
||||
, _mutations(make_mutations(_s, external_make_value))
|
||||
, _mutation_source([this] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
||||
|
||||
@@ -63,7 +63,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_releases_units) {
|
||||
simple_schema s;
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
// Not admitted, active
|
||||
@@ -123,7 +123,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
simple_schema s;
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
|
||||
const auto base_resources = reader_concurrency_semaphore::resources{1, 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
@@ -279,7 +279,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) {
|
||||
#endif
|
||||
|
||||
simple_schema s;
|
||||
reader_concurrency_semaphore semaphore(count, count * 1024, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), count, count * 1024);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
std::vector<std::unique_ptr<reader>> readers;
|
||||
@@ -396,7 +396,7 @@ class dummy_file_impl : public file_impl {
|
||||
|
||||
SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
|
||||
return async([&] {
|
||||
reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 4 * 1024);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get();
|
||||
|
||||
@@ -452,7 +452,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
|
||||
|
||||
SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
|
||||
return async([&] () {
|
||||
reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 2, new_reader_base_cost);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
{
|
||||
@@ -496,7 +496,7 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
|
||||
|
||||
SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
|
||||
return async([&] () {
|
||||
reader_concurrency_semaphore semaphore(1, new_reader_base_cost, get_name(), 2);
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost, 2);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
{
|
||||
@@ -599,7 +599,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
simple_schema s;
|
||||
const auto schema_ptr = s.schema().get();
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto require_can_admit = [&] (bool expected_can_admit, const char* description,
|
||||
@@ -841,7 +841,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 0);
|
||||
|
||||
@@ -796,7 +796,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
}
|
||||
};
|
||||
|
||||
reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name());
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, new_reader_base_cost);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
|
||||
auto schema = schema_builder("ks", "cf")
|
||||
|
||||
@@ -96,7 +96,7 @@ public:
|
||||
auto name = format("tests::reader_lifecycle_policy@{}@shard_id={}", fmt::ptr(this), shard);
|
||||
if (_evict_paused_readers) {
|
||||
// Create with no memory, so all inactive reads are immediately evicted.
|
||||
_contexts[shard]->semaphore.emplace(1, 0, std::move(name));
|
||||
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::for_tests{}, std::move(name), 1, 0);
|
||||
} else {
|
||||
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}, std::move(name));
|
||||
}
|
||||
|
||||
@@ -211,7 +211,7 @@ void test_scan_with_range_delete_over_rows() {
|
||||
std::numeric_limits<uint32_t>::max(),
|
||||
std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(),
|
||||
query::max_result_size()).get();
|
||||
query::max_result_size(query::result_memory_limiter::unlimited_result_size)).get();
|
||||
});
|
||||
|
||||
slm.stop();
|
||||
|
||||
Reference in New Issue
Block a user