diff --git a/query-result.hh b/query-result.hh index a5d1007ef2..dfc5e8be7d 100644 --- a/query-result.hh +++ b/query-result.hh @@ -205,6 +205,10 @@ public: auto to_block = std::min(_used_memory - _blocked_bytes, n); _blocked_bytes += to_block; stop = (_limiter->update_and_check(to_block) && _stop_on_global_limit) || stop; + if (stop && !_short_read_allowed) { + // If we are here we stopped because of the global limit. + throw std::runtime_error("Maximum amount of memory for building query results is exhausted, unpaged query cannot be finished"); + } } return stop; } diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 98aa1641b3..46844d4157 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -550,3 +550,71 @@ SEASTAR_THREAD_TEST_CASE(read_max_size) { } }).get(); } + +// Check that mutation queries, those that are stopped when the memory +// consumed by their results reach the local/global limit, are aborted +// instead of silently terminated when this happens. +SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) { + auto cfg = cql_test_config{}; + cfg.dbcfg.emplace(); + // The memory available to the result memory limiter (global limit) is + // configured based on the available memory, so give a small amount to + // the "node", so we don't have to work with large amount of data. + cfg.dbcfg->available_memory = 2 * 1024 * 1024; + do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE test (pk text, ck int, v text, PRIMARY KEY (pk, ck));").get(); + auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get0(); + + auto& db = e.local_db(); + auto& tab = db.find_column_family("ks", "test"); + auto s = tab.schema(); + + auto pk = make_local_key(s); + const auto raw_pk = utf8_type->decompose(data_value(pk)); + const auto cql3_pk = cql3::raw_value::make_value(raw_pk); + + const auto value = sstring(1024, 'a'); + const auto raw_value = utf8_type->decompose(data_value(value)); + const auto cql3_value = cql3::raw_value::make_value(raw_value); + + const int num_rows = 1024; + const auto max_size = 1024u * 1024u * 1024u; + + for (int i = 0; i != num_rows; ++i) { + const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i))); + e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get(); + } + + const auto partition_ranges = std::vector{query::full_partition_range}; + + const std::vector(schema_ptr, const query::read_command&)>>> query_methods{ + {"query_mutations()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future { + return db.query_mutations(s, cmd, partition_ranges.front(), {}, db::no_timeout).then( + [] (const std::tuple& res) { + return std::get<0>(res).memory_usage(); + }); + }}, + {"query_mutations_on_all_shards()", [&e, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future { + return query_mutations_on_all_shards(e.db(), s, cmd, partition_ranges, {}, db::no_timeout).then( + [] (const std::tuple>, cache_temperature>& res) { + return std::get<0>(res)->memory_usage(); + }); + }} + }; + + for (auto [query_method_name, query_method] : query_methods) { + testlog.info("checking: query_method={}", query_method_name); + auto slice = s->full_slice(); + slice.options.remove(); + query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size)); + try { + auto size = query_method(s, cmd).get0(); + // Just to ensure we are not interpreting empty results as success. + BOOST_REQUIRE(size != 0); + BOOST_FAIL("Expected exception, but none was thrown."); + } catch (std::runtime_error& e) { + testlog.trace("Exception thrown, as expected: {}", e); + } + } + }, std::move(cfg)).get(); +}