mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
result_memory_accounter: abort unpaged queries hitting the global limit
The `result_memory_accounter` terminates a query if it reaches either the global or shard-local limit. This used to be so only for paged queries, unpaged ones could grow indefinitely (until the node OOM'd). This was changed infea5067which enforces the local limit on unpaged queries as well, by aborting them. However a loophole remained in the code: `result_memory_accounter::check_and_update()` has another stop condition, besides `check_local_limit()`, it also checks the global limit. This stop condition was not updated to enforce itself on unpaged queries by aborting them, instead it silently terminated them, causing them to return less data then requested. This was masked by most queries reaching the local limit first. This patch fixes this by aborting unpaged mutation queries when they hit the global limit. Fixes: #8162 Tests: unit(release) Signed-off-by: Botond Dénes <bdenes@scylladb.com> Message-Id: <20210226102202.51275-1-bdenes@scylladb.com> (cherry picked from commitdd5a601aaa)
This commit is contained in:
@@ -205,6 +205,10 @@ public:
|
||||
auto to_block = std::min(_used_memory - _blocked_bytes, n);
|
||||
_blocked_bytes += to_block;
|
||||
stop = (_limiter->update_and_check(to_block) && _stop_on_global_limit) || stop;
|
||||
if (stop && !_short_read_allowed) {
|
||||
// If we are here we stopped because of the global limit.
|
||||
throw std::runtime_error("Maximum amount of memory for building query results is exhausted, unpaged query cannot be finished");
|
||||
}
|
||||
}
|
||||
return stop;
|
||||
}
|
||||
|
||||
@@ -550,3 +550,71 @@ SEASTAR_THREAD_TEST_CASE(read_max_size) {
|
||||
}
|
||||
}).get();
|
||||
}
|
||||
|
||||
// Check that mutation queries, those that are stopped when the memory
|
||||
// consumed by their results reach the local/global limit, are aborted
|
||||
// instead of silently terminated when this happens.
|
||||
SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) {
|
||||
auto cfg = cql_test_config{};
|
||||
cfg.dbcfg.emplace();
|
||||
// The memory available to the result memory limiter (global limit) is
|
||||
// configured based on the available memory, so give a small amount to
|
||||
// the "node", so we don't have to work with large amount of data.
|
||||
cfg.dbcfg->available_memory = 2 * 1024 * 1024;
|
||||
do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE test (pk text, ck int, v text, PRIMARY KEY (pk, ck));").get();
|
||||
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0();
|
||||
|
||||
auto& db = e.local_db();
|
||||
auto& tab = db.find_column_family("ks", "test");
|
||||
auto s = tab.schema();
|
||||
|
||||
auto pk = make_local_key(s);
|
||||
const auto raw_pk = utf8_type->decompose(data_value(pk));
|
||||
const auto cql3_pk = cql3::raw_value::make_value(raw_pk);
|
||||
|
||||
const auto value = sstring(1024, 'a');
|
||||
const auto raw_value = utf8_type->decompose(data_value(value));
|
||||
const auto cql3_value = cql3::raw_value::make_value(raw_value);
|
||||
|
||||
const int num_rows = 1024;
|
||||
const auto max_size = 1024u * 1024u * 1024u;
|
||||
|
||||
for (int i = 0; i != num_rows; ++i) {
|
||||
const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i)));
|
||||
e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get();
|
||||
}
|
||||
|
||||
const auto partition_ranges = std::vector<dht::partition_range>{query::full_partition_range};
|
||||
|
||||
const std::vector<std::pair<sstring, std::function<future<size_t>(schema_ptr, const query::read_command&)>>> query_methods{
|
||||
{"query_mutations()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
||||
return db.query_mutations(s, cmd, partition_ranges.front(), {}, db::no_timeout).then(
|
||||
[] (const std::tuple<reconcilable_result, cache_temperature>& res) {
|
||||
return std::get<0>(res).memory_usage();
|
||||
});
|
||||
}},
|
||||
{"query_mutations_on_all_shards()", [&e, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
||||
return query_mutations_on_all_shards(e.db(), s, cmd, partition_ranges, {}, db::no_timeout).then(
|
||||
[] (const std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>& res) {
|
||||
return std::get<0>(res)->memory_usage();
|
||||
});
|
||||
}}
|
||||
};
|
||||
|
||||
for (auto [query_method_name, query_method] : query_methods) {
|
||||
testlog.info("checking: query_method={}", query_method_name);
|
||||
auto slice = s->full_slice();
|
||||
slice.options.remove<query::partition_slice::option::allow_short_read>();
|
||||
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size));
|
||||
try {
|
||||
auto size = query_method(s, cmd).get0();
|
||||
// Just to ensure we are not interpreting empty results as success.
|
||||
BOOST_REQUIRE(size != 0);
|
||||
BOOST_FAIL("Expected exception, but none was thrown.");
|
||||
} catch (std::runtime_error& e) {
|
||||
testlog.trace("Exception thrown, as expected: {}", e);
|
||||
}
|
||||
}
|
||||
}, std::move(cfg)).get();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user