diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 3d23ce0b65..b1000a06da 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -30,6 +30,7 @@ #include "tracing/trace_state.hh" #include "mutation.hh" #include "query_class_config.hh" +#include "mutation_consumer_concepts.hh" #include #include @@ -42,30 +43,6 @@ using seastar::future; class mutation_source; -template -concept FlatMutationReaderConsumer = - requires(Consumer c, mutation_fragment mf) { - { c(std::move(mf)) } -> std::same_as; - } || requires(Consumer c, mutation_fragment mf) { - { c(std::move(mf)) } -> std::same_as>; - }; - - -template -concept FlattenedConsumer = - StreamedMutationConsumer && requires(T obj, const dht::decorated_key& dk) { - { obj.consume_new_partition(dk) }; - { obj.consume_end_of_partition() }; - }; - -template -concept FlattenedConsumerFilter = - requires(T filter, const dht::decorated_key& dk, const mutation_fragment& mf) { - { filter(dk) } -> std::same_as; - { filter(mf) } -> std::same_as; - { filter.on_end_of_stream() } -> std::same_as; - }; - /* * Allows iteration on mutations using mutation_fragments. * It iterates over mutations one by one and for each mutation diff --git a/mutation.cc b/mutation.cc index 4dc67e5e76..e025c67d5d 100644 --- a/mutation.cc +++ b/mutation.cc @@ -105,41 +105,6 @@ bool mutation::operator!=(const mutation& m) const { return !(*this == m); } -void -mutation::query(query::result::builder& builder, - const query::partition_slice& slice, - gc_clock::time_point now, - uint64_t row_limit) && -{ - auto pb = builder.add_partition(*schema(), key()); - auto is_reversed = slice.options.contains(); - auto always_return_static_content = slice.options.contains(); - mutation_partition& p = partition(); - auto limit = std::min(row_limit, slice.partition_row_limit()); - p.compact_for_query(*schema(), now, slice.row_ranges(*schema(), key()), always_return_static_content, is_reversed, limit); - p.query_compacted(pb, *schema(), limit); -} - -query::result -mutation::query(const query::partition_slice& slice, - query::result_memory_accounter&& accounter, - query::result_options opts, - gc_clock::time_point now, uint64_t row_limit) && -{ - query::result::builder builder(slice, opts, std::move(accounter)); - std::move(*this).query(builder, slice, now, row_limit); - return builder.build(); -} - -query::result -mutation::query(const query::partition_slice& slice, - query::result_memory_accounter&& accounter, - query::result_options opts, - gc_clock::time_point now, uint64_t row_limit) const& -{ - return mutation(*this).query(slice, std::move(accounter), opts, now, row_limit); -} - uint64_t mutation::live_row_count(gc_clock::time_point query_time) const { return partition().live_row_count(*schema(), query_time); diff --git a/mutation.hh b/mutation.hh index f8b7b284e6..0862ffdd69 100644 --- a/mutation.hh +++ b/mutation.hh @@ -29,9 +29,22 @@ #include "dht/i_partitioner.hh" #include "hashing.hh" #include "mutation_fragment.hh" +#include "mutation_consumer_concepts.hh" #include + +template +struct mutation_consume_result { + stop_iteration stop; + Result result; +}; + +template<> +struct mutation_consume_result { + stop_iteration stop; +}; + class mutation final { private: struct data { @@ -106,26 +119,11 @@ public: bool operator==(const mutation&) const; bool operator!=(const mutation&) const; public: - // The supplied partition_slice must be governed by this mutation's schema - query::result query(const query::partition_slice&, - query::result_memory_accounter&& accounter, - query::result_options opts = query::result_options::only_result(), - gc_clock::time_point now = gc_clock::now(), - uint64_t row_limit = query::max_rows) &&; - - // The supplied partition_slice must be governed by this mutation's schema - // FIXME: Slower than the r-value version - query::result query(const query::partition_slice&, - query::result_memory_accounter&& accounter, - query::result_options opts = query::result_options::only_result(), - gc_clock::time_point now = gc_clock::now(), - uint64_t row_limit = query::max_rows) const&; - - // The supplied partition_slice must be governed by this mutation's schema - void query(query::result::builder& builder, - const query::partition_slice& slice, - gc_clock::time_point now = gc_clock::now(), - uint64_t row_limit = query::max_rows) &&; + // Consumes the mutation's content. + // + // The mutation is in a moved-from alike state after consumption. + template + auto consume(Consumer& consumer) && -> mutation_consume_result; // See mutation_partition::live_row_count() uint64_t live_row_count(gc_clock::time_point query_time = gc_clock::time_point::min()) const; @@ -145,6 +143,58 @@ private: friend std::ostream& operator<<(std::ostream& os, const mutation& m); }; +template +auto mutation::consume(Consumer& consumer) && -> mutation_consume_result { + consumer.consume_new_partition(_ptr->_dk); + + auto& partition = _ptr->_p; + + if (partition.partition_tombstone()) { + consumer.consume(partition.partition_tombstone()); + } + + stop_iteration stop = stop_iteration::no; + if (!partition.static_row().empty()) { + stop = consumer.consume(static_row(std::move(partition.static_row().get_existing()))); + } + + std::unique_ptr> cr(partition.clustered_rows().unlink_leftmost_without_rebalance()); + std::unique_ptr> rt(partition.row_tombstones().pop_front_and_lock()); + + position_in_partition::less_compare cmp_less(*_ptr->_schema); + + while (!stop && (cr || rt)) { + bool emit_rt; + if (rt && cr) { + emit_rt = cmp_less(rt->position(), cr->position()); + } else { + emit_rt = bool(rt); + } + if (emit_rt) { + stop = consumer.consume(std::move(*rt)); + rt.reset(partition.row_tombstones().pop_front_and_lock()); + } else { + stop = consumer.consume(clustering_row(std::move(*cr))); + cr.reset(partition.clustered_rows().unlink_leftmost_without_rebalance()); + } + } + while (cr) { + cr.reset(partition.clustered_rows().unlink_leftmost_without_rebalance()); + } + while (rt) { + rt.reset(partition.row_tombstones().pop_front_and_lock()); + } + + const auto stop_consuming = consumer.consume_end_of_partition(); + using consume_res_type = decltype(consumer.consume_end_of_stream()); + if constexpr (std::is_same_v) { + consumer.consume_end_of_stream(); + return mutation_consume_result{stop_consuming}; + } else { + return mutation_consume_result{stop_consuming, consumer.consume_end_of_stream()}; + } +} + struct mutation_equals_by_key { bool operator()(const mutation& m1, const mutation& m2) const { return m1.schema() == m2.schema() diff --git a/mutation_compactor.hh b/mutation_compactor.hh index fd7a662226..d284e649b4 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -291,6 +291,10 @@ public: // We are passing only dead (purged) data so pass is_live=false. gc_consumer.consume(std::move(sr_garbage), current_tombstone, false); }); + } else { + if (can_purge_tombstone(current_tombstone)) { + current_tombstone = {}; + } } _static_row_live = is_live; if (is_live || (!only_live() && !sr.empty())) { @@ -333,6 +337,10 @@ public: // We are passing only dead (purged) data so pass is_live=false. gc_consumer.consume(std::move(cr_garbage), t, false); }); + } else { + if (can_purge_tombstone(t)) { + t = {}; + } } if (only_live() && is_live) { diff --git a/mutation_consumer_concepts.hh b/mutation_consumer_concepts.hh new file mode 100644 index 0000000000..3e030a0350 --- /dev/null +++ b/mutation_consumer_concepts.hh @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once +#include "mutation_fragment.hh" + +template +concept FlatMutationReaderConsumer = + requires(Consumer c, mutation_fragment mf) { + { c(std::move(mf)) } -> std::same_as; + } || requires(Consumer c, mutation_fragment mf) { + { c(std::move(mf)) } -> std::same_as>; + }; + + +template +concept FlattenedConsumer = + StreamedMutationConsumer && requires(T obj, const dht::decorated_key& dk) { + { obj.consume_new_partition(dk) }; + { obj.consume_end_of_partition() }; + }; + +template +concept FlattenedConsumerFilter = + requires(T filter, const dht::decorated_key& dk, const mutation_fragment& mf) { + { filter(dk) } -> std::same_as; + { filter(mf) } -> std::same_as; + { filter.on_end_of_stream() } -> std::same_as; + }; + diff --git a/mutation_partition.cc b/mutation_partition.cc index dcec5770c0..bc9de2188e 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -908,94 +908,6 @@ bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tomb return any_live; } -void -mutation_partition::query_compacted(query::result::partition_writer& pw, const schema& s, uint64_t limit) const { - check_schema(s); - const query::partition_slice& slice = pw.slice(); - max_timestamp max_ts{pw.last_modified()}; - - if (limit == 0) { - pw.retract(); - return; - } - - auto static_cells_wr = pw.start().start_static_row().start_cells(); - - if (!slice.static_columns.empty()) { - if (pw.requested_result()) { - get_compacted_row_slice(s, slice, column_kind::static_column, static_row().get(), slice.static_columns, static_cells_wr); - } - if (pw.requested_digest()) { - auto pt = partition_tombstone(); - pw.digest().feed_hash(pt); - max_ts.update(pt.timestamp); - pw.digest().feed_hash(static_row().get(), s, column_kind::static_column, slice.static_columns, max_ts); - } - } - - auto rows_wr = std::move(static_cells_wr).end_cells() - .end_static_row() - .start_rows(); - - uint64_t row_count = 0; - - auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); - auto send_ck = slice.options.contains(query::partition_slice::option::send_clustering_key); - for_each_row(s, query::clustering_range::make_open_ended_both_sides(), is_reversed, [&] (const rows_entry& e) { - if (e.dummy()) { - return stop_iteration::no; - } - auto& row = e.row(); - auto row_tombstone = tombstone_for_row(s, e); - - if (pw.requested_digest()) { - pw.digest().feed_hash(e.key(), s); - pw.digest().feed_hash(row_tombstone); - max_ts.update(row_tombstone.tomb().timestamp); - pw.digest().feed_hash(row.cells(), s, column_kind::regular_column, slice.regular_columns, max_ts); - } - - if (row.is_live(s)) { - if (pw.requested_result()) { - auto cells_wr = [&] { - if (send_ck) { - return rows_wr.add().write_key(e.key()).start_cells().start_cells(); - } else { - return rows_wr.add().skip_key().start_cells().start_cells(); - } - }(); - get_compacted_row_slice(s, slice, column_kind::regular_column, row.cells(), slice.regular_columns, cells_wr); - std::move(cells_wr).end_cells().end_cells().end_qr_clustered_row(); - } - ++row_count; - if (--limit == 0) { - return stop_iteration::yes; - } - } - return stop_iteration::no; - }); - - pw.last_modified() = max_ts.max; - - // If we got no rows, but have live static columns, we should only - // give them back IFF we did not have any CK restrictions. - // #589 - // If ck:s exist, and we do a restriction on them, we either have maching - // rows, or return nothing, since cql does not allow "is null". - bool return_static_content_on_partition_with_no_rows = - pw.slice().options.contains(query::partition_slice::option::always_return_static_content) || - !has_ck_selector(pw.ranges()); - if (row_count == 0 - && (!return_static_content_on_partition_with_no_rows - || !has_any_live_data(s, column_kind::static_column, static_row().get()))) { - pw.retract(); - } else { - pw.row_count() += row_count ? : 1; - pw.partition_count() += 1; - std::move(rows_wr).end_rows().end_qr_partition(); - } -} - std::ostream& operator<<(std::ostream& os, const std::pair& c) { return fmt_print(os, "{{column: {} {}}}", c.first, c.second); @@ -2359,6 +2271,35 @@ reconcilable_result reconcilable_result_builder::consume_end_of_stream() { std::move(_memory_accounter).done()); } +query::result +to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint64_t max_rows, uint32_t max_partitions, + query::result_options opts) { + // This result was already built with a limit, don't apply another one. + query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); + auto consumer = compact_for_query(*s, gc_clock::time_point::min(), slice, max_rows, + max_partitions, query_result_builder(*s, builder)); + + for (const partition& p : r.partitions()) { + const auto res = p.mut().unfreeze(s).consume(consumer); + if (res.stop == stop_iteration::yes) { + break; + } + } + if (r.is_short_read()) { + builder.mark_as_short_read(); + } + return builder.build(); +} + +query::result +query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) { + query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); + auto consumer = compact_for_query(*m.schema(), now, slice, row_limit, + query::max_partitions, query_result_builder(*m.schema(), builder)); + std::move(m).consume(consumer); + return builder.build(); +} + future static do_mutation_query(schema_ptr s, mutation_source source, diff --git a/mutation_partition.hh b/mutation_partition.hh index 88e3e0af48..bb0ef14c78 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -1479,11 +1479,6 @@ public: return boost::make_iterator_range(_rows.begin(), _rows.end()) | boost::adaptors::filtered([] (const rows_entry& e) { return bool(!e.dummy()); }); } - // Writes this partition using supplied query result writer. - // The partition should be first compacted with compact_for_query(), otherwise - // results may include data which is deleted/expired. - // At most row_limit CQL rows will be written and digested. - void query_compacted(query::result::partition_writer& pw, const schema& s, uint64_t row_limit) const; void accept(const schema&, mutation_partition_visitor&) const; // Returns the number of live CQL rows in this partition. diff --git a/mutation_query.cc b/mutation_query.cc index e4e98567a7..90e6fa0430 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -63,23 +63,6 @@ bool reconcilable_result::operator!=(const reconcilable_result& other) const { return !(*this == other); } -query::result -to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint64_t max_rows, uint32_t max_partitions, query::result_options opts) { - // This result was already built with a limit, don't apply another one. - query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - for (const partition& p : r.partitions()) { - if (builder.row_count() >= max_rows || builder.partition_count() >= max_partitions) { - break; - } - // Also enforces the per-partition limit. - p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), max_rows - builder.row_count()); - } - if (r.is_short_read()) { - builder.mark_as_short_read(); - } - return builder.build(); -} - std::ostream& operator<<(std::ostream& out, const reconcilable_result::printer& pr) { out << "{rows=" << pr.self.row_count() << ", short_read=" << pr.self.is_short_read() << ", ["; diff --git a/mutation_query.hh b/mutation_query.hh index d1cdc33dfd..b32ca06271 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -166,7 +166,23 @@ public: reconcilable_result consume_end_of_stream(); }; -query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint64_t row_limit, uint32_t partition_limit, query::result_options opts = query::result_options::only_result()); +query::result to_data_query_result( + const reconcilable_result&, + schema_ptr, + const query::partition_slice&, + uint64_t row_limit, + uint32_t partition_limit, + query::result_options opts = query::result_options::only_result()); + +// Query the content of the mutation. +// +// The mutation is destroyed in the process, see `mutation::consume()`. +query::result query_mutation( + mutation&& m, + const query::partition_slice& slice, + uint64_t row_limit = query::max_rows, + gc_clock::time_point now = gc_clock::now(), + query::result_options opts = query::result_options::only_result()); // Performs a query on given data source returning data in reconcilable form. // diff --git a/query-result-set.cc b/query-result-set.cc index e221f14ef7..15d61d015f 100644 --- a/query-result-set.cc +++ b/query-result-set.cc @@ -25,6 +25,7 @@ #include "mutation.hh" #include "types/map.hh" #include "utils/exceptions.hh" +#include "mutation_query.hh" #include @@ -221,7 +222,7 @@ result_set::from_raw_result(schema_ptr s, const partition_slice& slice, const re result_set::result_set(const mutation& m) : result_set([&m] { auto slice = partition_slice_builder(*m.schema()).build(); - auto qr = mutation(m).query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, result_options::only_result()); + auto qr = query_mutation(mutation(m), slice); return result_set::from_raw_result(m.schema(), slice, qr); }()) { } diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index ca11e4125e..bd76121edc 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -770,8 +770,7 @@ SEASTAR_TEST_CASE(test_querying_of_mutation) { auto resultify = [s] (const mutation& m) -> query::result_set { auto slice = make_full_slice(*s); - return query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size })); + return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice)); }; mutation m(s, partition_key::from_single_value(*s, "key1")); @@ -806,8 +805,7 @@ SEASTAR_TEST_CASE(test_partition_with_no_live_data_is_absent_in_data_query_resul auto slice = make_full_slice(*s); - assert_that(query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }))) + assert_that(query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice))) .is_empty(); }); } @@ -831,7 +829,7 @@ SEASTAR_TEST_CASE(test_partition_with_live_data_in_static_row_is_present_in_the_ .build(); assert_that(query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }))) + query_mutation(mutation(m), slice))) .has_only(a_row() .with_column("pk", data_value(bytes("key1"))) .with_column("v", data_value::make_null(bytes_type))); @@ -855,7 +853,7 @@ SEASTAR_TEST_CASE(test_query_result_with_one_regular_column_missing) { auto slice = partition_slice_builder(*s).build(); assert_that(query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }))) + query_mutation(mutation(m), slice))) .has_only(a_row() .with_column("pk", data_value(bytes("key1"))) .with_column("ck", data_value(bytes("ck:A"))) @@ -1236,10 +1234,11 @@ SEASTAR_TEST_CASE(test_query_digest) { auto check_digests_equal = [] (const mutation& m1, const mutation& m2) { auto ps1 = partition_slice_builder(*m1.schema()).build(); auto ps2 = partition_slice_builder(*m2.schema()).build(); - auto digest1 = *m1.query(ps1, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, + auto digest1 = *query_mutation(mutation(m1), ps1, query::max_rows, gc_clock::now(), query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); - auto digest2 = *m2.query(ps2, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, + auto digest2 = *query_mutation( mutation(m2), ps2, query::max_rows, gc_clock::now(), query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); + if (digest1 != digest2) { BOOST_FAIL(format("Digest should be the same for {} and {}", m1, m2)); } @@ -1488,8 +1487,7 @@ SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) { .without_partition_key_columns() .build(); auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}; - return query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, opts, t)); + return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice, query::max_rows, t, opts)); }; mutation m(s, pk); @@ -1553,8 +1551,7 @@ SEASTAR_TEST_CASE(test_querying_expired_cells) { .without_partition_key_columns() .build(); auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}; - return query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, opts, t)); + return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice, query::max_rows, t, opts)); }; { diff --git a/test/perf/memory_footprint_test.cc b/test/perf/memory_footprint_test.cc index 92cba64ae2..8e48526b13 100644 --- a/test/perf/memory_footprint_test.cc +++ b/test/perf/memory_footprint_test.cc @@ -196,8 +196,7 @@ static sizes calculate_sizes(cache_tracker& tracker, const mutation_settings& se result.cache = tracker.region().occupancy().used_space() - cache_initial_occupancy; result.frozen = freeze(m).representation().size(); result.canonical = canonical_mutation(m).representation().size(); - result.query_result = m.query(partition_slice_builder(*s).build(), - query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::result_options::only_result()).buf().size(); + result.query_result = query_mutation(mutation(m), partition_slice_builder(*s).build()).buf().size(); tmpdir sstable_dir; sstables::test_env::do_with_async([&] (sstables::test_env& env) {