diff --git a/database.cc b/database.cc index 33cc2b47fb..01423dd36a 100644 --- a/database.cc +++ b/database.cc @@ -929,8 +929,10 @@ keyspace::make_column_family_config(const schema& s, const database& db) const { // avoid self-reporting if (is_system_table(s)) { cfg.sstables_manager = &db.get_system_sstables_manager(); + cfg.max_memory_for_unlimited_query = std::numeric_limits::max(); } else { cfg.sstables_manager = &db.get_user_sstables_manager(); + cfg.max_memory_for_unlimited_query = db_config.max_memory_for_unlimited_query(); } cfg.view_update_concurrency_semaphore = _config.view_update_concurrency_semaphore; @@ -1206,6 +1208,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh cmd.row_limit, cmd.partition_limit, cmd.timestamp, + cf.get_config().max_memory_for_unlimited_query, std::move(accounter), std::move(trace_state), timeout, diff --git a/database.hh b/database.hh index d968118996..30d8bbca58 100644 --- a/database.hh +++ b/database.hh @@ -389,6 +389,7 @@ public: db::timeout_semaphore* view_update_concurrency_semaphore; size_t view_update_concurrency_semaphore_limit; db::data_listeners* data_listeners = nullptr; + utils::updateable_value max_memory_for_unlimited_query; }; struct no_commitlog {}; @@ -915,6 +916,10 @@ public: return _config.cf_stats; } + const config& get_config() const { + return _config; + } + compaction_manager& get_compaction_manager() const { return _compaction_manager; } diff --git a/db/config.cc b/db/config.cc index f7425fa961..738a610c9f 100644 --- a/db/config.cc +++ b/db/config.cc @@ -721,6 +721,8 @@ db::config::config(std::shared_ptr exts) , max_clustering_key_restrictions_per_query(this, "max_clustering_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100, "Maximum number of distinct clustering key restrictions per query. This limit places a bound on the size of IN tuples, " "especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.") + , max_memory_for_unlimited_query(this, "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, size_t(1) << 20, + "Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries.") , enable_3_1_0_compatibility_mode(this, "enable_3_1_0_compatibility_mode", value_status::Used, false, "Set to true if the cluster was initially installed from 3.1.0. If it was upgraded from an earlier version," " or installed from a later version, leave this set to false. This adjusts the communication protocol to" diff --git a/db/config.hh b/db/config.hh index 0a6ab4e819..f257838981 100644 --- a/db/config.hh +++ b/db/config.hh @@ -302,6 +302,7 @@ public: named_value abort_on_internal_error; named_value max_partition_key_restrictions_per_query; named_value max_clustering_key_restrictions_per_query; + named_value max_memory_for_unlimited_query; named_value enable_3_1_0_compatibility_mode; named_value enable_user_defined_functions; named_value user_defined_function_time_limit_ms; diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index f2d2c97c2b..7ed4996020 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -59,14 +59,14 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() { _buffer_size = compute_buffer_size(*_schema, _buffer); } -flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutation_reader::impl& original) { - // FIXME: #1413 Full partitions get accumulated in memory. - +flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption) { class partition_reversing_mutation_reader final : public flat_mutation_reader::impl { - flat_mutation_reader::impl* _source; + flat_mutation_reader* _source; range_tombstone_list _range_tombstones; std::stack _mutation_fragments; mutation_fragment_opt _partition_end; + size_t _stack_size = 0; + const size_t _max_stack_size; private: stop_iteration emit_partition() { auto emit_range_tombstone = [&] { @@ -76,12 +76,13 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio auto rt_owner = alloc_strategy_unique_ptr(&rt); push_mutation_fragment(mutation_fragment(std::move(rt))); }; - position_in_partition::less_compare cmp(*_source->_schema); + position_in_partition::less_compare cmp(*_schema); while (!_mutation_fragments.empty() && !is_buffer_full()) { auto& mf = _mutation_fragments.top(); if (!_range_tombstones.empty() && !cmp(_range_tombstones.tombstones().rbegin()->end_position(), mf.position())) { emit_range_tombstone(); } else { + _stack_size -= mf.memory_usage(*_schema); push_mutation_fragment(std::move(mf)); _mutation_fragments.pop(); } @@ -113,18 +114,35 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio return make_ready_future(stop_iteration::yes); } } else if (mf.is_range_tombstone()) { - _range_tombstones.apply(*_source->_schema, std::move(mf.as_range_tombstone())); + _range_tombstones.apply(*_schema, std::move(mf.as_range_tombstone())); } else { _mutation_fragments.emplace(std::move(mf)); + _stack_size += _mutation_fragments.top().memory_usage(*_schema); + if (_stack_size >= _max_stack_size) { + const partition_key* key = nullptr; + auto it = buffer().end(); + --it; + if (it->is_partition_start()) { + key = &it->as_partition_start().key().key(); + } else { + --it; + key = &it->as_partition_start().key().key(); + } + throw std::runtime_error(fmt::format( + "Aborting reverse partition read because partition {} is larger than the maximum safe size of {} for reversible partitions.", + key->with_schema(*_schema), + _max_stack_size)); + } } } return make_ready_future(is_buffer_full()); } public: - explicit partition_reversing_mutation_reader(flat_mutation_reader::impl& mr) - : flat_mutation_reader::impl(mr._schema) + explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, size_t max_stack_size) + : flat_mutation_reader::impl(mr.schema()) , _source(&mr) - , _range_tombstones(*mr._schema) + , _range_tombstones(*_schema) + , _max_stack_size(max_stack_size) { } virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { @@ -145,6 +163,7 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio clear_buffer_to_next_partition(); if (is_buffer_empty() && !is_end_of_stream()) { while (!_mutation_fragments.empty()) { + _stack_size -= _mutation_fragments.top().memory_usage(*_schema); _mutation_fragments.pop(); } _range_tombstones.clear(); @@ -165,7 +184,7 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio } }; - return make_flat_mutation_reader(original); + return make_flat_mutation_reader(original, max_memory_consumption); } template diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 0134feae5e..bac6ea0bd4 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -79,15 +79,6 @@ GCC6_CONCEPT( */ class flat_mutation_reader final { public: - // Causes a stream of reversed mutations to be emitted. - // 1. Static row is still emitted first. - // 2. Range tombstones are ordered by their end position. - // 3. Clustered rows and range tombstones are emitted in descending order. - // Because of 2 and 3 the guarantee that a range tombstone is emitted before - // any mutation fragment affected by it still holds. - // Ordering of partitions themselves remains unchanged. - using consume_reversed_partitions = seastar::bool_class; - class impl { private: circular_buffer _buffer; @@ -122,8 +113,6 @@ public: const circular_buffer& buffer() const { return _buffer; } - private: - static flat_mutation_reader reverse_partitions(flat_mutation_reader::impl&); public: impl(schema_ptr s) : _schema(std::move(s)) { } virtual ~impl() {} @@ -353,14 +342,7 @@ public: GCC6_CONCEPT( requires FlattenedConsumer() ) - auto consume(Consumer consumer, - db::timeout_clock::time_point timeout, - consume_reversed_partitions reversed = consume_reversed_partitions::no) { - if (reversed) { - return do_with(impl::reverse_partitions(*_impl), [&] (auto& reversed_partition_stream) { - return reversed_partition_stream._impl->consume(std::move(consumer), timeout); - }); - } + auto consume(Consumer consumer, db::timeout_clock::time_point timeout) { return _impl->consume(std::move(consumer), timeout); } @@ -747,6 +729,27 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer, db: flat_mutation_reader make_generating_reader(schema_ptr s, std::function ()> get_next_fragment); +/// A reader that emits partitions in reverse. +/// +/// 1. Static row is still emitted first. +/// 2. Range tombstones are ordered by their end position. +/// 3. Clustered rows and range tombstones are emitted in descending order. +/// Because of 2 and 3 the guarantee that a range tombstone is emitted before +/// any mutation fragment affected by it still holds. +/// Ordering of partitions themselves remains unchanged. +/// +/// \param original the reader to be reversed, has to be kept alive while the +/// reversing reader is in use. +/// \param max_memory_consumption the maximum amount of memory the reader is +/// allowed to use for reversing. The reverse reader reads entire partitions +/// into memory, before reversing them. Since partitions can be larger than +/// the available memory, we need to enforce a limit on memory consumption. +/// If the read uses more memory then this limit, the read is aborted. +/// +/// FIXME: reversing should be done in the sstable layer, see #1413. +flat_mutation_reader +make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption); + /// Low level fragment stream validator. /// /// Tracks and validates the monotonicity of the passed in fragment kinds, diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 2156605ac1..9b2b20a942 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -23,6 +23,7 @@ #include "service/priority_manager.hh" #include "multishard_mutation_query.hh" #include "database.hh" +#include "db/config.hh" #include @@ -220,6 +221,10 @@ public: read_context& operator=(read_context&&) = delete; read_context& operator=(const read_context&) = delete; + distributed& db() { + return _db; + } + virtual flat_mutation_reader create_reader( schema_ptr schema, const dht::partition_range& pr, @@ -604,8 +609,9 @@ static future do_query_mutations( return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] ( flat_mutation_reader& reader, lw_shared_ptr& compaction_state) mutable { auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter)); + auto& table = ctx->db().local().find_column_family(reader.schema()); return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.row_limit, cmd.partition_limit, cmd.timestamp, - timeout).then([&] (consume_result&& result) mutable { + timeout, table.get_config().max_memory_for_unlimited_query).then([&] (consume_result&& result) mutable { return make_ready_future(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state))); }); }).then_wrapped([&ctx] (future&& result_fut) { diff --git a/mutation_partition.cc b/mutation_partition.cc index 230b260d13..69348845ae 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2168,6 +2168,7 @@ future<> data_query( uint32_t partition_limit, gc_clock::time_point query_time, query::result::builder& builder, + uint64_t max_memory_reverse_query, tracing::trace_state_ptr trace_ptr, db::timeout_clock::time_point timeout, query::querier_cache_context cache_ctx) @@ -2181,9 +2182,10 @@ future<> data_query( ? std::move(*querier_opt) : query::data_querier(source, s, range, slice, service::get_local_sstable_query_read_priority(), trace_ptr); - return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable { + return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), + cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable { auto qrb = query_result_builder(*s, builder); - return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout).then( + return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then( [=, &builder, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] () mutable { if (q.are_limits_reached() || builder.is_short_read()) { cache_ctx.insert(std::move(q), std::move(trace_ptr)); @@ -2261,6 +2263,7 @@ static do_mutation_query(schema_ptr s, uint32_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, + uint64_t max_memory_reverse_query, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr, db::timeout_clock::time_point timeout, @@ -2278,7 +2281,7 @@ static do_mutation_query(schema_ptr s, return do_with(std::move(q), [=, &slice, accounter = std::move(accounter), trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] ( query::mutation_querier& q) mutable { auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter)); - return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout).then( + return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then( [=, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (reconcilable_result r) mutable { if (q.are_limits_reached() || r.is_short_read()) { cache_ctx.insert(std::move(q), std::move(trace_ptr)); @@ -2300,13 +2303,14 @@ mutation_query(schema_ptr s, uint32_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, + uint64_t max_memory_reverse_query, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr, db::timeout_clock::time_point timeout, query::querier_cache_context cache_ctx) { return do_mutation_query(std::move(s), std::move(source), seastar::cref(range), seastar::cref(slice), - row_limit, partition_limit, query_time, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx)); + row_limit, partition_limit, query_time, max_memory_reverse_query, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx)); } deletable_row::deletable_row(clustering_row&& cr) @@ -2528,7 +2532,7 @@ future counter_write_query(schema_ptr s, const mutation_source& so auto cwqrb = counter_write_query_result_builder(*s); auto cfq = make_stable_flattened_mutations_consumer>( *s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb)); - auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout, flat_mutation_reader::consume_reversed_partitions::no); + auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout); return f.finally([r_a_r = std::move(r_a_r)] { }); } diff --git a/mutation_query.hh b/mutation_query.hh index af9493d9cb..7c8a235105 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -161,6 +161,7 @@ future mutation_query( uint32_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, + uint64_t max_memory_reverse_query, query::result_memory_accounter&& accounter = { }, tracing::trace_state_ptr trace_ptr = nullptr, db::timeout_clock::time_point timeout = db::no_timeout, @@ -175,6 +176,7 @@ future<> data_query( uint32_t partition_limit, gc_clock::time_point query_time, query::result::builder& builder, + uint64_t max_memory_reverse_query, tracing::trace_state_ptr trace_ptr = nullptr, db::timeout_clock::time_point timeout = db::no_timeout, query::querier_cache_context cache_ctx = { }); @@ -189,6 +191,7 @@ class mutation_query_stage { uint32_t, uint32_t, gc_clock::time_point, + uint64_t, query::result_memory_accounter&&, tracing::trace_state_ptr, db::timeout_clock::time_point, diff --git a/querier.cc b/querier.cc index 08408628d1..0f11b8f330 100644 --- a/querier.cc +++ b/querier.cc @@ -52,7 +52,7 @@ static sstring cannot_use_reason(can_use cu) static bool ring_position_matches(const schema& s, const dht::partition_range& range, const query::partition_slice& slice, const position_view& pos) { - const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(slice.options.contains(query::partition_slice::option::reversed)); + const auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); const auto expected_start = dht::ring_position_view(*pos.partition_key); // If there are no clustering columns or the select is distinct we don't @@ -93,7 +93,7 @@ static bool clustering_position_matches(const schema& s, const query::partition_ clustering_key_prefix::equality eq(s); - const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(slice.options.contains(query::partition_slice::option::reversed)); + const auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); // If the page ended mid-partition the first partition range should start // with the last clustering key (exclusive). diff --git a/querier.hh b/querier.hh index c00bc54728..d17cf23a19 100644 --- a/querier.hh +++ b/querier.hh @@ -84,7 +84,8 @@ auto consume_page(flat_mutation_reader& reader, uint32_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, - db::timeout_clock::time_point timeout) { + db::timeout_clock::time_point timeout, + size_t reverse_read_max_memory) { // FIXME: #3158 // consumer cannot be moved after consume_new_partition() is called // on it because it stores references to some of it's own members. @@ -95,15 +96,22 @@ auto consume_page(flat_mutation_reader& reader, const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end; compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, *consumer); - const auto is_reversed = flat_mutation_reader::consume_reversed_partitions( - slice.options.contains(query::partition_slice::option::reversed)); - auto last_ckey = make_lw_shared>(); auto reader_consumer = make_stable_flattened_mutations_consumer>>( compaction_state, clustering_position_tracker(std::move(consumer), last_ckey)); - return reader.consume(std::move(reader_consumer), timeout, is_reversed).then([last_ckey] (auto&&... results) mutable { + auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout, reverse_read_max_memory] () mutable { + if (slice.options.contains(query::partition_slice::option::reversed)) { + return do_with(make_reversing_reader(reader, reverse_read_max_memory), + [reader_consumer = std::move(reader_consumer), timeout] (flat_mutation_reader& reversing_reader) mutable { + return reversing_reader.consume(std::move(reader_consumer), timeout); + }); + } + return reader.consume(std::move(reader_consumer), timeout); + }; + + return consume().then([last_ckey] (auto&&... results) mutable { static_assert(sizeof...(results) <= 1); return make_ready_future, std::decay_t...>>(std::tuple(std::move(*last_ckey), std::move(results)...)); }); @@ -176,9 +184,10 @@ public: uint32_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, - db::timeout_clock::time_point timeout) { + db::timeout_clock::time_point timeout, + size_t reverse_read_max_memory) { return ::query::consume_page(_reader, _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time, - timeout).then([this] (auto&& results) { + timeout, reverse_read_max_memory).then([this] (auto&& results) { _last_ckey = std::get>(std::move(results)); constexpr auto size = std::tuple_size>::value; static_assert(size <= 2); diff --git a/table.cc b/table.cc index b146eb5704..61a40f5b6c 100644 --- a/table.cc +++ b/table.cc @@ -2397,7 +2397,7 @@ table::query(schema_ptr s, return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] { auto&& range = *qs.current_partition_range++; return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(), - qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state, timeout, cache_ctx); + qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, _config.max_memory_for_unlimited_query, trace_state, timeout, cache_ctx); }).then([qs_ptr = std::move(qs_ptr), &qs] { return make_ready_future>( make_lw_shared(qs.builder.build())); diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index a548ff51e7..7d7ce50e02 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -589,8 +589,11 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti assert(bool(!reversed)); return fmr.consume_in_thread(std::move(fsc), db::no_timeout); } else { - auto reversed_flag = flat_mutation_reader::consume_reversed_partitions(bool(reversed)); - return fmr.consume(std::move(fsc), db::no_timeout, reversed_flag).get0(); + if (reversed) { + auto reverse_reader = make_reversing_reader(fmr, size_t(1) << 20); + return reverse_reader.consume(std::move(fsc), db::no_timeout).get0(); + } + return fmr.consume(std::move(fsc), db::no_timeout).get0(); } }; @@ -784,3 +787,49 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_reader_from_fragments_as_mutation_source) }; run_mutation_source_tests(populate); } + +SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) { + simple_schema schema; + + struct phony_consumer { + void consume_new_partition(const dht::decorated_key&) { } + void consume(tombstone) { } + stop_iteration consume(static_row&&) { return stop_iteration::no; } + stop_iteration consume(clustering_row&&) { return stop_iteration::no; } + stop_iteration consume(range_tombstone&&) { return stop_iteration::no; } + stop_iteration consume_end_of_partition() { return stop_iteration::no; } + void consume_end_of_stream() { } + }; + + auto test_with_partition = [&] (bool with_static_row) { + BOOST_TEST_MESSAGE(fmt::format("Testing with_static_row={}", with_static_row)); + auto mut = schema.new_mutation("pk1"); + const size_t desired_mut_size = 1 * 1024 * 1024; + const size_t row_size = 10 * 1024; + + if (with_static_row) { + schema.add_static_row(mut, "s1"); + } + + for (size_t i = 0; i < desired_mut_size / row_size; ++i) { + schema.add_row(mut, schema.make_ckey(++i), sstring(row_size, '0')); + } + + auto reader = flat_mutation_reader_from_mutations({mut}); + auto reverse_reader = make_reversing_reader(reader, size_t(1) << 10); + + try { + reverse_reader.consume(phony_consumer{}, db::no_timeout).get(); + BOOST_FAIL("No exception thrown for reversing overly big partition"); + } catch (const std::runtime_error& e) { + BOOST_TEST_MESSAGE(fmt::format("Got exception with message: {}", e.what())); + auto str = sstring(e.what()); + BOOST_REQUIRE_EQUAL(str.find("Aborting reverse partition read because partition pk1"), 0); + } catch (...) { + throw; + } + }; + + test_with_partition(true); + test_with_partition(false); +} diff --git a/test/boost/mutation_query_test.cc b/test/boost/mutation_query_test.cc index ec442b735b..d15ed88bd4 100644 --- a/test/boost/mutation_query_test.cc +++ b/test/boost/mutation_query_test.cc @@ -76,6 +76,7 @@ static query::partition_slice make_full_slice(const schema& s) { } static auto inf32 = std::numeric_limits::max(); +static const uint64_t max_memory_for_reverse_query = 1 << 20; query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) { return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32)); @@ -100,7 +101,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, query::max_partitions, now).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0(); // FIXME: use mutation assertions assert_that(to_result_set(result, s, slice)) @@ -123,7 +124,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0(); + query::full_partition_range, slice, query::max_rows, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -159,7 +160,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, query::max_partitions, now).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -173,7 +174,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, query::max_partitions, now + 2s).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now + 2s, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -206,7 +207,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, query::max_partitions, now).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -236,7 +237,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, query::max_partitions, now).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -264,7 +265,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 10, query::max_partitions, now).get0(); + query::full_partition_range, slice, 10, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -284,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, query::max_partitions, now).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(1) @@ -296,7 +297,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, query::max_partitions, now).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -323,7 +324,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, query::max_partitions, now).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -347,7 +348,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, query::max_partitions, now).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -369,7 +370,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, query::max_partitions, now).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -395,7 +396,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0(); + query::full_partition_range, slice, query::max_rows, query::max_partitions, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .is_empty(); @@ -445,7 +446,8 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) { auto query_time = now + std::chrono::seconds(1); - reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time).get0(); + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time, + max_memory_for_reverse_query).get0(); BOOST_REQUIRE_EQUAL(result.partitions().size(), 2); BOOST_REQUIRE_EQUAL(result.row_count(), 2); @@ -463,24 +465,29 @@ SEASTAR_TEST_CASE(test_result_row_count) { auto src = make_source({m1}); - auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32); + auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now, + max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 0); m1.set_static_cell("s1", data_value(bytes("S_v1")), 1); - r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32); + r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now, + max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 1); m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1); - r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32); + r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now, + max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 1); m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1); - r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32); + r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now, + max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 2); mutation m2(s, partition_key::from_single_value(*s, "key2")); m2.set_static_cell("s1", data_value(bytes("S_v1")), 1); - r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32); + r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now, + max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 3); }); } @@ -503,7 +510,7 @@ SEASTAR_TEST_CASE(test_partition_limit) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, 10, now).get0(); + query::full_partition_range, slice, query::max_rows, 10, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -519,7 +526,7 @@ SEASTAR_TEST_CASE(test_partition_limit) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, 1, now).get0(); + query::full_partition_range, slice, query::max_rows, 1, now, max_memory_for_reverse_query).get0(); assert_that(to_result_set(result, s, slice)) .has_size(1) @@ -541,10 +548,12 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) { slice.options.set(); query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0()); - data_query(s, source, query::full_partition_range, slice, std::numeric_limits::max(), std::numeric_limits::max(), gc_clock::now(), digest_only_builder).get0(); + data_query(s, source, query::full_partition_range, slice, std::numeric_limits::max(), std::numeric_limits::max(), + gc_clock::now(), digest_only_builder, max_memory_for_reverse_query).get0(); query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0()); - data_query(s, source, query::full_partition_range, slice, std::numeric_limits::max(), std::numeric_limits::max(), gc_clock::now(), result_and_digest_builder).get0(); + data_query(s, source, query::full_partition_range, slice, std::numeric_limits::max(), std::numeric_limits::max(), + gc_clock::now(), result_and_digest_builder, max_memory_for_reverse_query).get0(); BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory()); } diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 86135c4720..6408a0d438 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2928,7 +2928,7 @@ void run_compaction_data_stream_split_test(const schema& schema, gc_clock::time_ survived_compacted_fragments_consumer(schema, query_time, get_max_purgeable), purged_compacted_fragments_consumer(schema, query_time, get_max_purgeable)); - auto [survived_partitions, purged_partitions] = reader.consume(std::move(consumer), db::no_timeout, flat_mutation_reader::consume_reversed_partitions::no).get0(); + auto [survived_partitions, purged_partitions] = reader.consume(std::move(consumer), db::no_timeout).get0(); tlog.info("Survived data: {}", create_stats(survived_partitions)); tlog.info("Purged data: {}", create_stats(purged_partitions)); diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 26da41f27a..f7db2ce536 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -213,7 +213,7 @@ public: auto querier = make_querier(range); auto [dk, ck] = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits::max(), - gc_clock::now(), db::no_timeout).get0(); + gc_clock::now(), db::no_timeout, std::numeric_limits::max()).get0(); const auto memory_usage = querier.memory_usage(); _cache.insert(cache_key, std::move(querier), nullptr); diff --git a/utils/updateable_value.cc b/utils/updateable_value.cc index 640d642dc0..cf5ce40a13 100644 --- a/utils/updateable_value.cc +++ b/utils/updateable_value.cc @@ -82,6 +82,15 @@ updateable_value_base::operator=(updateable_value_base&& v) noexcept { return *this; } +updateable_value_base& +updateable_value_base::updateable_value_base::operator=(nullptr_t) { + if (_source) { + _source->del_ref(this); + _source = nullptr; + } + return *this; +} + void updateable_value_source_base::for_each_ref(std::function func) { for (auto ref : _refs) { diff --git a/utils/updateable_value.hh b/utils/updateable_value.hh index c3297fd790..3adf8a6044 100644 --- a/utils/updateable_value.hh +++ b/utils/updateable_value.hh @@ -61,6 +61,7 @@ public: updateable_value_base& operator=(const updateable_value_base&); updateable_value_base(updateable_value_base&&) noexcept; updateable_value_base& operator=(updateable_value_base&&) noexcept; + updateable_value_base& operator=(nullptr_t); friend class updateable_value_source_base; }; @@ -78,6 +79,7 @@ public: explicit updateable_value(T value) : _value(std::move(value)) {} explicit updateable_value(const updateable_value_source& source); updateable_value(const updateable_value& v); + updateable_value& operator=(T value); updateable_value& operator=(const updateable_value&); updateable_value(updateable_value&&) noexcept; updateable_value& operator=(updateable_value&&) noexcept; @@ -170,6 +172,13 @@ template updateable_value::updateable_value(const updateable_value& v) : updateable_value_base(v), _value(v._value) { } +template +updateable_value& updateable_value::operator=(T value) { + updateable_value_base::operator=(nullptr); + _value = std::move(value); + return *this; +} + template updateable_value& updateable_value::operator=(const updateable_value& v) { if (this != &v) {