mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 09:00:35 +00:00
Merge "Limit non-paged query memory consumption" from Botond
"
Non-paged queries completely ignore the query result size limiter
mechanism. They consume all the memory they want. With sufficiently
large datasets this can easily lead to a handful or even a single
unpaged query producing an OOM.
This series continues the work started by 134d5a5f7, by introducing a
configurable pair of soft/hard limit (default to 1MB/100MB) that is
applied to otherwise unlimited queries, like reverse and unpaged ones.
When an unlimited query reaches the soft limit a warning is logged. This
should give users some heads-up to adjust their application. When the
hard limit is reached the query is aborted. The idea is to not greet
users with failing queries after an upgrade while at the same time
protect the database from the really bad queries. The hard limit should
be decreased from time to time gradually approaching the desired goal of
1MB.
We don't want to limit internal queries, we trust ourselves to either
use another form of memory usage control, or read only small datasets.
So the limit is selected according to the query class. User reads use
the `max_memory_for_unlimited_query_{soft,hard}_limit` configuration
items, while internal reads are not limited. The limit is obtained by
the coordinator, who passes it down to replicas using the existing
`max_result_size` parameter (which is not a special type containing the
two limits), which is now passed on every verb, instead of once per
connection. This ensures that all replicas work with the same limits.
For normal paged queries `max_result_size` is set to the usual
`query::result_memory_limiter::maximum_result_size` For queries that can
consume unlimited amount of memory -- unpaged and reverse queries --
this is set to the value of the aforementioned
`max_memory_for_unlimited_query_{soft,hard}_limit` configuration item,
but only for user reads, internal reads are not limited.
This has the side-effect that reverse reads now send entire
partitions in a single page, but this is not that bad. The data was
already read, and its size was below the limit, the replica might as well
send it all.
Fixes: #5870
"
* 'nonpaged-query-limit/v5' of https://github.com/denesb/scylla: (26 commits)
test: database_test: add test for enforced max result limit
mutation_partition: abort read when hard limit is exceeded for non-paged reads
query-result.hh: move the definition of short_read to the top
test: cql_test_env: set the max_memory_unlimited_query_{soft,hard}_limit
test: set the allow_short_read slice option for paged queries
partition_slice_builder: add with_option()
result_memory_accounter: remove default constructor
query_*(): use the coordinator specified memory limit for unlimited queries
storage_proxy: use read_command::max_result_size to pass max result size around
query: result_memory_limiter: use the new max_result_size type
query: read_command: add max_result_size
query: read_command: use tagged ints for limit ctor params
query: read_command: add separate convenience constructor
service: query_pager: set the allow_short_read flag
result_memory_accounter: check(): use _maximum_result_size instead of hardcoded limit
storage_proxy: add get_max_result_size()
result_memory_limiter: add unlimited_result_size constant
database: add get_statement_scheduling_group()
database: query_mutations(): obtain the memory accounter inside
query: query_class_config: use max_result_size for the max_memory_for_unlimited_query field
...
This commit is contained in:
@@ -2765,7 +2765,8 @@ SEASTAR_TEST_CASE(test_reversed_slice_with_empty_range_before_all_rows) {
|
||||
// See #6171
|
||||
SEASTAR_TEST_CASE(test_reversed_slice_with_many_clustering_ranges) {
|
||||
cql_test_config cfg;
|
||||
cfg.db_config->max_memory_for_unlimited_query(std::numeric_limits<uint64_t>::max());
|
||||
cfg.db_config->max_memory_for_unlimited_query_soft_limit(std::numeric_limits<uint64_t>::max());
|
||||
cfg.db_config->max_memory_for_unlimited_query_hard_limit(std::numeric_limits<uint64_t>::max());
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE test (pk int, ck int, v text, PRIMARY KEY (pk, ck));").get();
|
||||
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0();
|
||||
@@ -4574,3 +4575,81 @@ SEASTAR_TEST_CASE(test_impossible_where) {
|
||||
require_rows(e, "SELECT * FROM t2 WHERE c>=10 AND c<=0 ALLOW FILTERING", {});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_query_limit) {
|
||||
cql_test_config cfg;
|
||||
|
||||
cfg.db_config->max_memory_for_unlimited_query_soft_limit.set(256, utils::config_file::config_source::CommandLine);
|
||||
cfg.db_config->max_memory_for_unlimited_query_hard_limit.set(1024, utils::config_file::config_source::CommandLine);
|
||||
|
||||
cfg.dbcfg.emplace();
|
||||
cfg.dbcfg->available_memory = memory::stats().total_memory();
|
||||
cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0();
|
||||
cfg.dbcfg->streaming_scheduling_group = seastar::create_scheduling_group("streaming", 200).get0();
|
||||
|
||||
do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE test (pk int, ck int, v text, PRIMARY KEY (pk, ck));").get();
|
||||
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0();
|
||||
|
||||
const int pk = 0;
|
||||
const auto raw_pk = int32_type->decompose(data_value(pk));
|
||||
const auto cql3_pk = cql3::raw_value::make_value(raw_pk);
|
||||
|
||||
const auto value = sstring(1024, 'a');
|
||||
const auto raw_value = utf8_type->decompose(data_value(value));
|
||||
const auto cql3_value = cql3::raw_value::make_value(raw_value);
|
||||
|
||||
const int num_rows = 10;
|
||||
|
||||
for (int i = 0; i != num_rows; ++i) {
|
||||
const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i)));
|
||||
e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get();
|
||||
}
|
||||
|
||||
auto& db = e.local_db();
|
||||
|
||||
const auto make_expected_row = [&] (int ck) -> std::vector<bytes_opt> {
|
||||
return {raw_pk, int32_type->decompose(ck), raw_value};
|
||||
};
|
||||
|
||||
const auto normal_rows = boost::copy_range<std::vector<std::vector<bytes_opt>>>(boost::irange(0, num_rows) | boost::adaptors::transformed(make_expected_row));
|
||||
const auto reversed_rows = boost::copy_range<std::vector<std::vector<bytes_opt>>>(boost::irange(0, num_rows) | boost::adaptors::reversed | boost::adaptors::transformed(make_expected_row));
|
||||
|
||||
for (auto is_paged : {true, false}) {
|
||||
for (auto is_reversed : {true, false}) {
|
||||
for (auto scheduling_group : {db.get_statement_scheduling_group(), db.get_streaming_scheduling_group(), default_scheduling_group()}) {
|
||||
const auto should_fail = (!is_paged || is_reversed) && scheduling_group == db.get_statement_scheduling_group();
|
||||
testlog.info("checking: is_paged={}, is_reversed={}, scheduling_group={}, should_fail={}", is_paged, is_reversed, scheduling_group.name(), should_fail);
|
||||
const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC");
|
||||
|
||||
int32_t page_size = is_paged ? 10000 : -1;
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()});
|
||||
|
||||
const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows;
|
||||
|
||||
try {
|
||||
auto result = with_scheduling_group(scheduling_group, [&e] (const sstring& q, std::unique_ptr<cql3::query_options> qo) {
|
||||
return e.execute_cql(q, std::move(qo));
|
||||
}, select_query, std::move(qo)).get0();
|
||||
assert_that(std::move(result))
|
||||
.is_rows()
|
||||
.with_rows(*expected_rows);
|
||||
|
||||
if (should_fail) {
|
||||
BOOST_FAIL("Expected exception, but none was thrown.");
|
||||
} else {
|
||||
testlog.trace("No exception thrown, as expected.");
|
||||
}
|
||||
} catch (exceptions::read_failure_exception& e) {
|
||||
if (should_fail) {
|
||||
testlog.trace("Exception thrown, as expected: {}", e);
|
||||
} else {
|
||||
BOOST_FAIL(fmt::format("Expected no exception, but caught: {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}, std::move(cfg)).get();
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/result_set_assertions.hh"
|
||||
#include "test/lib/reader_permit.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
#include "database.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
@@ -40,6 +41,7 @@
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "db/data_listeners.hh"
|
||||
#include "multishard_mutation_query.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -62,8 +64,8 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
|
||||
|
||||
auto assert_query_result = [&] (size_t expected_size) {
|
||||
auto max_size = std::numeric_limits<size_t>::max();
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), 1000);
|
||||
auto&& [result, cache_tempature] = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size, db::no_timeout).get0();
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), query::row_limit(1000));
|
||||
auto&& [result, cache_tempature] = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, db::no_timeout).get0();
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
|
||||
};
|
||||
assert_query_result(1000);
|
||||
@@ -105,22 +107,22 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
|
||||
auto max_size = std::numeric_limits<size_t>::max();
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), 3);
|
||||
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size, db::no_timeout).get0());
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), query::row_limit(3));
|
||||
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, db::no_timeout).get0());
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
|
||||
}
|
||||
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
|
||||
query::max_rows, gc_clock::now(), std::nullopt, 5);
|
||||
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size, db::no_timeout).get0());
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
||||
query::row_limit(query::max_rows), query::partition_limit(5));
|
||||
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, db::no_timeout).get0());
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(5);
|
||||
}
|
||||
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
|
||||
query::max_rows, gc_clock::now(), std::nullopt, 3);
|
||||
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size, db::no_timeout).get0());
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
||||
query::row_limit(query::max_rows), query::partition_limit(3));
|
||||
auto result = std::get<0>(db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, db::no_timeout).get0());
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
|
||||
}
|
||||
});
|
||||
@@ -472,3 +474,86 @@ SEASTAR_TEST_CASE(toppartitions_cross_shard_schema_ptr) {
|
||||
tq.gather().get();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(read_max_size) {
|
||||
do_with_cql_env([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE test (pk text, ck int, v text, PRIMARY KEY (pk, ck));").get();
|
||||
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0();
|
||||
|
||||
auto& db = e.local_db();
|
||||
auto& tab = db.find_column_family("ks", "test");
|
||||
auto s = tab.schema();
|
||||
|
||||
auto pk = make_local_key(s);
|
||||
const auto raw_pk = utf8_type->decompose(data_value(pk));
|
||||
const auto cql3_pk = cql3::raw_value::make_value(raw_pk);
|
||||
|
||||
const auto value = sstring(1024, 'a');
|
||||
const auto raw_value = utf8_type->decompose(data_value(value));
|
||||
const auto cql3_value = cql3::raw_value::make_value(raw_value);
|
||||
|
||||
const int num_rows = 1024;
|
||||
|
||||
for (int i = 0; i != num_rows; ++i) {
|
||||
const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i)));
|
||||
e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get();
|
||||
}
|
||||
|
||||
const auto partition_ranges = std::vector<dht::partition_range>{query::full_partition_range};
|
||||
|
||||
const std::vector<std::pair<sstring, std::function<future<size_t>(schema_ptr, const query::read_command&)>>> query_methods{
|
||||
{"query_mutations()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
||||
return db.query_mutations(s, cmd, partition_ranges.front(), {}, db::no_timeout).then(
|
||||
[] (const std::tuple<reconcilable_result, cache_temperature>& res) {
|
||||
return std::get<0>(res).memory_usage();
|
||||
});
|
||||
}},
|
||||
{"query()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
||||
return db.query(s, cmd, query::result_options::only_result(), partition_ranges, {}, db::no_timeout).then(
|
||||
[] (const std::tuple<lw_shared_ptr<query::result>, cache_temperature>& res) {
|
||||
return size_t(std::get<0>(res)->buf().size());
|
||||
});
|
||||
}},
|
||||
{"query_mutations_on_all_shards()", [&e, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
||||
return query_mutations_on_all_shards(e.db(), s, cmd, partition_ranges, {}, db::no_timeout).then(
|
||||
[] (const std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>& res) {
|
||||
return std::get<0>(res)->memory_usage();
|
||||
});
|
||||
}}
|
||||
};
|
||||
|
||||
for (auto [query_method_name, query_method] : query_methods) {
|
||||
for (auto allow_short_read : {true, false}) {
|
||||
for (auto max_size : {1024u, 1024u * 1024u, 1024u * 1024u * 1024u}) {
|
||||
const auto should_throw = max_size < (num_rows * value.size() * 2) && !allow_short_read;
|
||||
testlog.info("checking: query_method={}, allow_short_read={}, max_size={}, should_throw={}", query_method_name, allow_short_read, max_size, should_throw);
|
||||
auto slice = s->full_slice();
|
||||
if (allow_short_read) {
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
} else {
|
||||
slice.options.remove<query::partition_slice::option::allow_short_read>();
|
||||
}
|
||||
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size));
|
||||
try {
|
||||
auto size = query_method(s, cmd).get0();
|
||||
// Just to ensure we are not interpreting empty results as success.
|
||||
BOOST_REQUIRE(size != 0);
|
||||
if (should_throw) {
|
||||
BOOST_FAIL("Expected exception, but none was thrown.");
|
||||
} else {
|
||||
testlog.trace("No exception thrown, as expected.");
|
||||
}
|
||||
} catch (std::runtime_error& e) {
|
||||
if (should_throw) {
|
||||
testlog.trace("Exception thrown, as expected: {}", e);
|
||||
} else {
|
||||
BOOST_FAIL(fmt::format("Expected no exception, but caught: {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -592,7 +592,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
|
||||
return fmr.consume_in_thread(std::move(fsc), db::no_timeout);
|
||||
} else {
|
||||
if (reversed) {
|
||||
auto reverse_reader = make_reversing_reader(fmr, size_t(1) << 20);
|
||||
auto reverse_reader = make_reversing_reader(fmr, query::max_result_size(size_t(1) << 20));
|
||||
return reverse_reader.consume(std::move(fsc), db::no_timeout).get0();
|
||||
}
|
||||
return fmr.consume(std::move(fsc), db::no_timeout).get0();
|
||||
@@ -805,7 +805,8 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
|
||||
|
||||
auto test_with_partition = [&] (bool with_static_row) {
|
||||
testlog.info("Testing with_static_row={}", with_static_row);
|
||||
auto mut = schema.new_mutation("pk1");
|
||||
const auto pk = "pk1";
|
||||
auto mut = schema.new_mutation(pk);
|
||||
const size_t desired_mut_size = 1 * 1024 * 1024;
|
||||
const size_t row_size = 10 * 1024;
|
||||
|
||||
@@ -817,8 +818,9 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
|
||||
schema.add_row(mut, schema.make_ckey(++i), sstring(row_size, '0'));
|
||||
}
|
||||
|
||||
const uint64_t hard_limit = size_t(1) << 18;
|
||||
auto reader = flat_mutation_reader_from_mutations({mut});
|
||||
auto reverse_reader = make_reversing_reader(reader, size_t(1) << 10);
|
||||
auto reverse_reader = make_reversing_reader(reader, query::max_result_size(size_t(1) << 10, hard_limit));
|
||||
|
||||
try {
|
||||
reverse_reader.consume(phony_consumer{}, db::no_timeout).get();
|
||||
@@ -826,7 +828,12 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
|
||||
} catch (const std::runtime_error& e) {
|
||||
testlog.info("Got exception with message: {}", e.what());
|
||||
auto str = sstring(e.what());
|
||||
BOOST_REQUIRE_EQUAL(str.find("Aborting reverse partition read because partition pk1"), 0);
|
||||
const auto expected_str = format(
|
||||
"Memory usage of reversed read exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit), while reading partition {}",
|
||||
hard_limit,
|
||||
pk);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(str.find(expected_str), 0);
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
|
||||
@@ -82,9 +82,9 @@ SEASTAR_THREAD_TEST_CASE(test_abandoned_read) {
|
||||
(void)_;
|
||||
|
||||
auto cmd = query::read_command(s->id(), s->version(), s->full_slice(), 7, gc_clock::now(), std::nullopt, query::max_partitions,
|
||||
utils::make_random_uuid(), query::is_first_page::yes);
|
||||
utils::make_random_uuid(), query::is_first_page::yes, query::max_result_size(query::result_memory_limiter::unlimited_result_size));
|
||||
|
||||
query_mutations_on_all_shards(env.db(), s, cmd, {query::full_partition_range}, nullptr, std::numeric_limits<uint64_t>::max(), db::no_timeout).get();
|
||||
query_mutations_on_all_shards(env.db(), s, cmd, {query::full_partition_range}, nullptr, db::no_timeout).get();
|
||||
|
||||
check_cache_population(env.db(), 1);
|
||||
|
||||
@@ -104,11 +104,11 @@ static std::vector<mutation> read_all_partitions_one_by_one(distributed<database
|
||||
for (const auto& pkey : pkeys) {
|
||||
const auto res = db.invoke_on(sharder.shard_of(pkey.token()), [gs = global_schema_ptr(s), &pkey] (database& db) {
|
||||
return async([s = gs.get(), &pkey, &db] () mutable {
|
||||
auto accounter = db.get_result_memory_limiter().new_mutation_read(std::numeric_limits<size_t>::max()).get0();
|
||||
const auto cmd = query::read_command(s->id(), s->version(), s->full_slice(), query::max_rows);
|
||||
const auto cmd = query::read_command(s->id(), s->version(), s->full_slice(),
|
||||
query::max_result_size(query::result_memory_limiter::unlimited_result_size));
|
||||
const auto range = dht::partition_range::make_singular(pkey);
|
||||
return make_foreign(std::make_unique<reconcilable_result>(
|
||||
std::get<0>(db.query_mutations(std::move(s), cmd, range, std::move(accounter), nullptr, db::no_timeout).get0())));
|
||||
std::get<0>(db.query_mutations(std::move(s), cmd, range, nullptr, db::no_timeout).get0())));
|
||||
});
|
||||
}).get0();
|
||||
|
||||
@@ -126,13 +126,14 @@ read_partitions_with_paged_scan(distributed<database>& db, schema_ptr s, uint32_
|
||||
const dht::partition_range& range, const query::partition_slice& slice, const std::function<void(size_t)>& page_hook = {}) {
|
||||
const auto query_uuid = is_stateful ? utils::make_random_uuid() : utils::UUID{};
|
||||
std::vector<mutation> results;
|
||||
auto cmd = query::read_command(s->id(), s->version(), slice, page_size, gc_clock::now(), std::nullopt, query::max_partitions, query_uuid, query::is_first_page::yes);
|
||||
auto cmd = query::read_command(s->id(), s->version(), slice, page_size, gc_clock::now(), std::nullopt, query::max_partitions, query_uuid,
|
||||
query::is_first_page::yes, query::max_result_size(max_size));
|
||||
|
||||
bool has_more = true;
|
||||
|
||||
// First page is special, needs to have `is_first_page` set.
|
||||
{
|
||||
auto res = std::get<0>(query_mutations_on_all_shards(db, s, cmd, {range}, nullptr, max_size, db::no_timeout).get0());
|
||||
auto res = std::get<0>(query_mutations_on_all_shards(db, s, cmd, {range}, nullptr, db::no_timeout).get0());
|
||||
for (auto& part : res->partitions()) {
|
||||
auto mut = part.mut().unfreeze(s);
|
||||
results.emplace_back(std::move(mut));
|
||||
@@ -176,7 +177,7 @@ read_partitions_with_paged_scan(distributed<database>& db, schema_ptr s, uint32_
|
||||
cmd.slice.set_range(*s, last_pkey.key(), std::move(ckranges));
|
||||
}
|
||||
|
||||
auto res = std::get<0>(query_mutations_on_all_shards(db, s, cmd, {pkrange}, nullptr, max_size, db::no_timeout).get0());
|
||||
auto res = std::get<0>(query_mutations_on_all_shards(db, s, cmd, {pkrange}, nullptr, db::no_timeout).get0());
|
||||
|
||||
if (is_stateful) {
|
||||
BOOST_REQUIRE(aggregate_querier_cache_stat(db, &query::querier_cache::stats::lookups) >= npages);
|
||||
@@ -880,6 +881,7 @@ run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, distributed<database>& db,
|
||||
|
||||
const auto partition_slice = partition_slice_builder(*schema)
|
||||
.with_ranges(generate_clustering_ranges(rnd_engine, *schema, part_descs))
|
||||
.with_option<query::partition_slice::option::allow_short_read>()
|
||||
.build();
|
||||
|
||||
const auto is_stateful = stateful_query(std::uniform_int_distribution<int>(0, 3)(rnd_engine));
|
||||
@@ -972,7 +974,7 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
|
||||
|
||||
const auto& partitions = pop_desc.partitions;
|
||||
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(pop_desc.schema), &partitions] {
|
||||
auto& sem = db->local().make_query_class_config().semaphore;
|
||||
auto& sem = db->local().get_reader_concurrency_semaphore();
|
||||
|
||||
auto resources = sem.available_resources();
|
||||
resources -= reader_concurrency_semaphore::resources{1, 0};
|
||||
|
||||
@@ -78,6 +78,10 @@ static query::partition_slice make_full_slice(const schema& s) {
|
||||
|
||||
static auto inf32 = std::numeric_limits<unsigned>::max();
|
||||
|
||||
static query::result_memory_accounter make_accounter() {
|
||||
return query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size };
|
||||
}
|
||||
|
||||
query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) {
|
||||
return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32));
|
||||
}
|
||||
@@ -101,7 +105,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
// FIXME: use mutation assertions
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
@@ -124,7 +128,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -160,7 +164,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -174,7 +178,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -207,7 +211,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -237,7 +241,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -265,7 +269,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -285,7 +289,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -297,7 +301,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -324,7 +328,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -348,7 +352,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -370,7 +374,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -396,7 +400,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.is_empty();
|
||||
@@ -447,7 +451,7 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
|
||||
auto query_time = now + std::chrono::seconds(1);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time,
|
||||
db::no_timeout, tests::make_query_class_config()).get0();
|
||||
db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(result.partitions().size(), 2);
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
|
||||
@@ -466,28 +470,28 @@ SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
auto src = make_source({m1});
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, tests::make_query_class_config(), make_accounter()).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, tests::make_query_class_config(), make_accounter()).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, tests::make_query_class_config(), make_accounter()).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, tests::make_query_class_config(), make_accounter()).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(s, partition_key::from_single_value(*s, "key2"));
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
|
||||
db::no_timeout, tests::make_query_class_config(), make_accounter()).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
@@ -510,7 +514,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, 10, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, query::max_rows, 10, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -526,7 +530,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, 1, now, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
query::full_partition_range, slice, query::max_rows, 1, now, db::no_timeout, tests::make_query_class_config(), make_accounter()).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -547,11 +551,13 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
query::partition_slice slice = make_full_slice(*s);
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash},
|
||||
l.new_digest_read(query::max_result_size(query::result_memory_limiter::maximum_result_size), query::short_read::yes).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), digest_only_builder, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash},
|
||||
l.new_data_read(query::max_result_size(query::result_memory_limiter::maximum_result_size), query::short_read::yes).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), result_and_digest_builder, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
|
||||
|
||||
@@ -776,7 +776,8 @@ SEASTAR_TEST_CASE(test_querying_of_mutation) {
|
||||
|
||||
auto resultify = [s] (const mutation& m) -> query::result_set {
|
||||
auto slice = make_full_slice(*s);
|
||||
return query::result_set::from_raw_result(s, slice, m.query(slice));
|
||||
return query::result_set::from_raw_result(s, slice,
|
||||
m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }));
|
||||
};
|
||||
|
||||
mutation m(s, partition_key::from_single_value(*s, "key1"));
|
||||
@@ -811,7 +812,8 @@ SEASTAR_TEST_CASE(test_partition_with_no_live_data_is_absent_in_data_query_resul
|
||||
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
assert_that(query::result_set::from_raw_result(s, slice, m.query(slice)))
|
||||
assert_that(query::result_set::from_raw_result(s, slice,
|
||||
m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size })))
|
||||
.is_empty();
|
||||
});
|
||||
}
|
||||
@@ -834,7 +836,8 @@ SEASTAR_TEST_CASE(test_partition_with_live_data_in_static_row_is_present_in_the_
|
||||
.with_regular_column("v")
|
||||
.build();
|
||||
|
||||
assert_that(query::result_set::from_raw_result(s, slice, m.query(slice)))
|
||||
assert_that(query::result_set::from_raw_result(s, slice,
|
||||
m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size })))
|
||||
.has_only(a_row()
|
||||
.with_column("pk", data_value(bytes("key1")))
|
||||
.with_column("v", data_value::make_null(bytes_type)));
|
||||
@@ -857,7 +860,8 @@ SEASTAR_TEST_CASE(test_query_result_with_one_regular_column_missing) {
|
||||
|
||||
auto slice = partition_slice_builder(*s).build();
|
||||
|
||||
assert_that(query::result_set::from_raw_result(s, slice, m.query(slice)))
|
||||
assert_that(query::result_set::from_raw_result(s, slice,
|
||||
m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size })))
|
||||
.has_only(a_row()
|
||||
.with_column("pk", data_value(bytes("key1")))
|
||||
.with_column("ck", data_value(bytes("ck:A")))
|
||||
@@ -1243,8 +1247,10 @@ SEASTAR_TEST_CASE(test_query_digest) {
|
||||
auto check_digests_equal = [] (const mutation& m1, const mutation& m2) {
|
||||
auto ps1 = partition_slice_builder(*m1.schema()).build();
|
||||
auto ps2 = partition_slice_builder(*m2.schema()).build();
|
||||
auto digest1 = *m1.query(ps1, query::result_options::only_digest(query::digest_algorithm::xxHash)).digest();
|
||||
auto digest2 = *m2.query(ps2, query::result_options::only_digest(query::digest_algorithm::xxHash)).digest();
|
||||
auto digest1 = *m1.query(ps1, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size },
|
||||
query::result_options::only_digest(query::digest_algorithm::xxHash)).digest();
|
||||
auto digest2 = *m2.query(ps2, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size },
|
||||
query::result_options::only_digest(query::digest_algorithm::xxHash)).digest();
|
||||
if (digest1 != digest2) {
|
||||
BOOST_FAIL(format("Digest should be the same for {} and {}", m1, m2));
|
||||
}
|
||||
@@ -1493,7 +1499,8 @@ SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) {
|
||||
.without_partition_key_columns()
|
||||
.build();
|
||||
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
|
||||
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
|
||||
return query::result_set::from_raw_result(s, slice,
|
||||
m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, opts, t));
|
||||
};
|
||||
|
||||
mutation m(s, pk);
|
||||
@@ -1557,7 +1564,8 @@ SEASTAR_TEST_CASE(test_querying_expired_cells) {
|
||||
.without_partition_key_columns()
|
||||
.build();
|
||||
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
|
||||
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
|
||||
return query::result_set::from_raw_result(s, slice,
|
||||
m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, opts, t));
|
||||
};
|
||||
|
||||
{
|
||||
|
||||
@@ -214,7 +214,7 @@ public:
|
||||
|
||||
auto querier = make_querier<Querier>(range);
|
||||
auto [dk, ck] = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), db::no_timeout, std::numeric_limits<uint64_t>::max()).get0();
|
||||
gc_clock::now(), db::no_timeout, query::max_result_size(std::numeric_limits<uint64_t>::max())).get0();
|
||||
const auto memory_usage = querier.memory_usage();
|
||||
_cache.insert(cache_key, std::move(querier), nullptr);
|
||||
|
||||
@@ -658,25 +658,28 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
auto s = cf.schema();
|
||||
|
||||
cf.flush().get();
|
||||
auto slice = s->full_slice();
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
auto cmd1 = query::read_command(s->id(),
|
||||
s->version(),
|
||||
s->full_slice(),
|
||||
slice,
|
||||
1,
|
||||
gc_clock::now(),
|
||||
std::nullopt,
|
||||
1,
|
||||
utils::make_random_uuid());
|
||||
utils::make_random_uuid(),
|
||||
query::is_first_page::yes,
|
||||
query::max_result_size(1024 * 1024));
|
||||
|
||||
// Should save the querier in cache.
|
||||
db.query_mutations(s,
|
||||
cmd1,
|
||||
query::full_partition_range,
|
||||
db.get_result_memory_limiter().new_mutation_read(1024 * 1024).get0(),
|
||||
nullptr,
|
||||
db::no_timeout).get();
|
||||
|
||||
auto& semaphore = db.make_query_class_config().semaphore;
|
||||
auto& semaphore = db.get_reader_concurrency_semaphore();
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0);
|
||||
@@ -695,18 +698,19 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
|
||||
auto cmd2 = query::read_command(s->id(),
|
||||
s->version(),
|
||||
s->full_slice(),
|
||||
slice,
|
||||
1,
|
||||
gc_clock::now(),
|
||||
std::nullopt,
|
||||
1,
|
||||
utils::make_random_uuid());
|
||||
utils::make_random_uuid(),
|
||||
query::is_first_page::no,
|
||||
query::max_result_size(1024 * 1024));
|
||||
|
||||
// Should evict the already cached querier.
|
||||
db.query_mutations(s,
|
||||
cmd2,
|
||||
query::full_partition_range,
|
||||
db.get_result_memory_limiter().new_mutation_read(1024 * 1024).get0(),
|
||||
nullptr,
|
||||
db::no_timeout).get();
|
||||
|
||||
@@ -720,10 +724,10 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
// of the tracked buffers.
|
||||
cmd2.row_limit = query::max_rows;
|
||||
cmd2.partition_limit = query::max_partitions;
|
||||
cmd2.max_result_size.emplace(query::result_memory_limiter::unlimited_result_size);
|
||||
db.query_mutations(s,
|
||||
cmd2,
|
||||
query::full_partition_range,
|
||||
db.get_result_memory_limiter().new_mutation_read(1024 * 1024 * 1024 * 1024).get0(),
|
||||
nullptr,
|
||||
db::no_timeout).get();
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -555,7 +555,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
|
||||
auto& sem = *with_scheduling_group(e.local_db().get_streaming_scheduling_group(), [&] () {
|
||||
return &e.local_db().make_query_class_config().semaphore;
|
||||
return &e.local_db().get_reader_concurrency_semaphore();
|
||||
}).get0();
|
||||
|
||||
// consume all units except what is needed to admit a single reader.
|
||||
|
||||
@@ -409,6 +409,13 @@ public:
|
||||
create_directories((cfg->view_hints_directory() + "/" + std::to_string(i)).c_str());
|
||||
}
|
||||
|
||||
if (!cfg->max_memory_for_unlimited_query_soft_limit.is_set()) {
|
||||
cfg->max_memory_for_unlimited_query_soft_limit.set(uint64_t(query::result_memory_limiter::unlimited_result_size));
|
||||
}
|
||||
if (!cfg->max_memory_for_unlimited_query_hard_limit.is_set()) {
|
||||
cfg->max_memory_for_unlimited_query_hard_limit.set(uint64_t(query::result_memory_limiter::unlimited_result_size));
|
||||
}
|
||||
|
||||
sharded<locator::token_metadata> token_metadata;
|
||||
token_metadata.start().get();
|
||||
auto stop_token_metadata = defer([&token_metadata] { token_metadata.stop().get(); });
|
||||
|
||||
@@ -33,8 +33,8 @@ reader_permit make_permit() {
|
||||
return the_semaphore.make_permit();
|
||||
}
|
||||
|
||||
query_class_config make_query_class_config() {
|
||||
return query_class_config{the_semaphore, std::numeric_limits<uint64_t>::max()};
|
||||
query::query_class_config make_query_class_config() {
|
||||
return query::query_class_config{the_semaphore, query::max_result_size(std::numeric_limits<uint64_t>::max())};
|
||||
}
|
||||
|
||||
} // namespace tests
|
||||
|
||||
@@ -30,6 +30,6 @@ reader_concurrency_semaphore& semaphore();
|
||||
|
||||
reader_permit make_permit();
|
||||
|
||||
query_class_config make_query_class_config();
|
||||
query::query_class_config make_query_class_config();
|
||||
|
||||
} // namespace tests
|
||||
|
||||
@@ -308,7 +308,7 @@ int main(int argc, char** argv) {
|
||||
auto prev_occupancy = logalloc::shard_tracker().occupancy();
|
||||
testlog.info("Occupancy before: {}", prev_occupancy);
|
||||
|
||||
auto& sem = env.local_db().make_query_class_config().semaphore;
|
||||
auto& sem = env.local_db().get_reader_concurrency_semaphore();
|
||||
|
||||
testlog.info("Reading");
|
||||
stats_collector sc(sem, stats_collector_params);
|
||||
|
||||
@@ -196,7 +196,8 @@ static sizes calculate_sizes(cache_tracker& tracker, const mutation_settings& se
|
||||
result.cache = tracker.region().occupancy().used_space() - cache_initial_occupancy;
|
||||
result.frozen = freeze(m).representation().size();
|
||||
result.canonical = canonical_mutation(m).representation().size();
|
||||
result.query_result = m.query(partition_slice_builder(*s).build(), query::result_options::only_result()).buf().size();
|
||||
result.query_result = m.query(partition_slice_builder(*s).build(),
|
||||
query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::result_options::only_result()).buf().size();
|
||||
|
||||
tmpdir sstable_dir;
|
||||
sstables::test_env env;
|
||||
|
||||
Reference in New Issue
Block a user