From dd5a601aaace55fae2e5e589da863063ee75b68e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 26 Feb 2021 12:22:02 +0200 Subject: [PATCH] result_memory_accounter: abort unpaged queries hitting the global limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `result_memory_accounter` terminates a query if it reaches either the global or shard-local limit. This used to be so only for paged queries, unpaged ones could grow indefinitely (until the node OOM'd). This was changed in fea5067 which enforces the local limit on unpaged queries as well, by aborting them. However a loophole remained in the code: `result_memory_accounter::check_and_update()` has another stop condition, besides `check_local_limit()`, it also checks the global limit. This stop condition was not updated to enforce itself on unpaged queries by aborting them, instead it silently terminated them, causing them to return less data then requested. This was masked by most queries reaching the local limit first. This patch fixes this by aborting unpaged mutation queries when they hit the global limit. Fixes: #8162 Tests: unit(release) Signed-off-by: Botond Dénes Message-Id: <20210226102202.51275-1-bdenes@scylladb.com> --- query-result.hh | 4 +++ test/boost/database_test.cc | 68 +++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/query-result.hh b/query-result.hh index a5d1007ef2..dfc5e8be7d 100644 --- a/query-result.hh +++ b/query-result.hh @@ -205,6 +205,10 @@ public: auto to_block = std::min(_used_memory - _blocked_bytes, n); _blocked_bytes += to_block; stop = (_limiter->update_and_check(to_block) && _stop_on_global_limit) || stop; + if (stop && !_short_read_allowed) { + // If we are here we stopped because of the global limit. + throw std::runtime_error("Maximum amount of memory for building query results is exhausted, unpaged query cannot be finished"); + } } return stop; } diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index e1d5096714..28725060a1 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -550,3 +550,71 @@ SEASTAR_THREAD_TEST_CASE(read_max_size) { } }).get(); } + +// Check that mutation queries, those that are stopped when the memory +// consumed by their results reach the local/global limit, are aborted +// instead of silently terminated when this happens. +SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) { + auto cfg = cql_test_config{}; + cfg.dbcfg.emplace(); + // The memory available to the result memory limiter (global limit) is + // configured based on the available memory, so give a small amount to + // the "node", so we don't have to work with large amount of data. + cfg.dbcfg->available_memory = 2 * 1024 * 1024; + do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE test (pk text, ck int, v text, PRIMARY KEY (pk, ck));").get(); + auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0(); + + auto& db = e.local_db(); + auto& tab = db.find_column_family("ks", "test"); + auto s = tab.schema(); + + auto pk = make_local_key(s); + const auto raw_pk = utf8_type->decompose(data_value(pk)); + const auto cql3_pk = cql3::raw_value::make_value(raw_pk); + + const auto value = sstring(1024, 'a'); + const auto raw_value = utf8_type->decompose(data_value(value)); + const auto cql3_value = cql3::raw_value::make_value(raw_value); + + const int num_rows = 1024; + const auto max_size = 1024u * 1024u * 1024u; + + for (int i = 0; i != num_rows; ++i) { + const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i))); + e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get(); + } + + const auto partition_ranges = std::vector{query::full_partition_range}; + + const std::vector(schema_ptr, const query::read_command&)>>> query_methods{ + {"query_mutations()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future { + return db.query_mutations(s, cmd, partition_ranges.front(), {}, db::no_timeout).then( + [] (const std::tuple& res) { + return std::get<0>(res).memory_usage(); + }); + }}, + {"query_mutations_on_all_shards()", [&e, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future { + return query_mutations_on_all_shards(e.db(), s, cmd, partition_ranges, {}, db::no_timeout).then( + [] (const std::tuple>, cache_temperature>& res) { + return std::get<0>(res)->memory_usage(); + }); + }} + }; + + for (auto [query_method_name, query_method] : query_methods) { + testlog.info("checking: query_method={}", query_method_name); + auto slice = s->full_slice(); + slice.options.remove(); + query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size)); + try { + auto size = query_method(s, cmd).get0(); + // Just to ensure we are not interpreting empty results as success. + BOOST_REQUIRE(size != 0); + BOOST_FAIL("Expected exception, but none was thrown."); + } catch (std::runtime_error& e) { + testlog.trace("Exception thrown, as expected: {}", e); + } + } + }, std::move(cfg)).get(); +}