From 73808c12ebf8684e0151ee4dbd2eff63bf6a5b15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 18 Jan 2021 13:59:24 +0200 Subject: [PATCH 1/9] mutation compactor: query compaction: ignore purgeable tombstones This behaviour is makes query result building sensitive to whether the data was recently compacted or not, in particular different digests will be produced depending on whether purgeable tombstones happened to be compacted (and thus purged) or not. This means that two replicas can produce different digests for the same data if has compacted some purgeable tombstones and the other not. To avoid this, drop purgeable tombstones during query compaction as well. --- mutation_compactor.hh | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/mutation_compactor.hh b/mutation_compactor.hh index fd7a662226..d284e649b4 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -291,6 +291,10 @@ public: // We are passing only dead (purged) data so pass is_live=false. gc_consumer.consume(std::move(sr_garbage), current_tombstone, false); }); + } else { + if (can_purge_tombstone(current_tombstone)) { + current_tombstone = {}; + } } _static_row_live = is_live; if (is_live || (!only_live() && !sr.empty())) { @@ -333,6 +337,10 @@ public: // We are passing only dead (purged) data so pass is_live=false. gc_consumer.consume(std::move(cr_garbage), t, false); }); + } else { + if (can_purge_tombstone(t)) { + t = {}; + } } if (only_live() && is_live) { From 9153f631359791e28321ea91aad2b85e7c0c50a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 15 Jan 2021 09:34:00 +0200 Subject: [PATCH 2/9] flat_mutation_reader: move mutation consumer concepts to separate header In the next patch we will want to use these concepts in `mutation.hh`. To avoid pulling in the entire `flat_mutation_reader.hh` just for these, and create a circular dependency in doing so, move them to a dedicated header instead. --- flat_mutation_reader.hh | 25 +----------------- mutation_consumer_concepts.hh | 48 +++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 24 deletions(-) create mode 100644 mutation_consumer_concepts.hh diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 3d23ce0b65..b1000a06da 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -30,6 +30,7 @@ #include "tracing/trace_state.hh" #include "mutation.hh" #include "query_class_config.hh" +#include "mutation_consumer_concepts.hh" #include #include @@ -42,30 +43,6 @@ using seastar::future; class mutation_source; -template -concept FlatMutationReaderConsumer = - requires(Consumer c, mutation_fragment mf) { - { c(std::move(mf)) } -> std::same_as; - } || requires(Consumer c, mutation_fragment mf) { - { c(std::move(mf)) } -> std::same_as>; - }; - - -template -concept FlattenedConsumer = - StreamedMutationConsumer && requires(T obj, const dht::decorated_key& dk) { - { obj.consume_new_partition(dk) }; - { obj.consume_end_of_partition() }; - }; - -template -concept FlattenedConsumerFilter = - requires(T filter, const dht::decorated_key& dk, const mutation_fragment& mf) { - { filter(dk) } -> std::same_as; - { filter(mf) } -> std::same_as; - { filter.on_end_of_stream() } -> std::same_as; - }; - /* * Allows iteration on mutations using mutation_fragments. * It iterates over mutations one by one and for each mutation diff --git a/mutation_consumer_concepts.hh b/mutation_consumer_concepts.hh new file mode 100644 index 0000000000..3e030a0350 --- /dev/null +++ b/mutation_consumer_concepts.hh @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once +#include "mutation_fragment.hh" + +template +concept FlatMutationReaderConsumer = + requires(Consumer c, mutation_fragment mf) { + { c(std::move(mf)) } -> std::same_as; + } || requires(Consumer c, mutation_fragment mf) { + { c(std::move(mf)) } -> std::same_as>; + }; + + +template +concept FlattenedConsumer = + StreamedMutationConsumer && requires(T obj, const dht::decorated_key& dk) { + { obj.consume_new_partition(dk) }; + { obj.consume_end_of_partition() }; + }; + +template +concept FlattenedConsumerFilter = + requires(T filter, const dht::decorated_key& dk, const mutation_fragment& mf) { + { filter(dk) } -> std::same_as; + { filter(mf) } -> std::same_as; + { filter.on_end_of_stream() } -> std::same_as; + }; + From d0c5f550a9349ca217ced1949af40d129cef0575 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 14 Jan 2021 19:03:38 +0200 Subject: [PATCH 3/9] mutation: add consume() This consume method accepts a `FlattenedConsumer`, the same one that the name-sake `flat_mutation_reader::consume()` does. Indeed the main purpose of this method is to allow using the standard query result building stack with a mutation, the same way said stack is used with mutation readers currently. This will allow us to replace the parallel query result building code that currently exists in the `mutation::query()` and friends, with the standard one. --- mutation.hh | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/mutation.hh b/mutation.hh index f8b7b284e6..e3436d8d71 100644 --- a/mutation.hh +++ b/mutation.hh @@ -29,9 +29,22 @@ #include "dht/i_partitioner.hh" #include "hashing.hh" #include "mutation_fragment.hh" +#include "mutation_consumer_concepts.hh" #include + +template +struct mutation_consume_result { + stop_iteration stop; + Result result; +}; + +template<> +struct mutation_consume_result { + stop_iteration stop; +}; + class mutation final { private: struct data { @@ -127,6 +140,12 @@ public: gc_clock::time_point now = gc_clock::now(), uint64_t row_limit = query::max_rows) &&; + // Consumes the mutation's content. + // + // The mutation is in a moved-from alike state after consumption. + template + auto consume(Consumer& consumer) && -> mutation_consume_result; + // See mutation_partition::live_row_count() uint64_t live_row_count(gc_clock::time_point query_time = gc_clock::time_point::min()) const; @@ -145,6 +164,58 @@ private: friend std::ostream& operator<<(std::ostream& os, const mutation& m); }; +template +auto mutation::consume(Consumer& consumer) && -> mutation_consume_result { + consumer.consume_new_partition(_ptr->_dk); + + auto& partition = _ptr->_p; + + if (partition.partition_tombstone()) { + consumer.consume(partition.partition_tombstone()); + } + + stop_iteration stop = stop_iteration::no; + if (!partition.static_row().empty()) { + stop = consumer.consume(static_row(std::move(partition.static_row().get_existing()))); + } + + std::unique_ptr> cr(partition.clustered_rows().unlink_leftmost_without_rebalance()); + std::unique_ptr> rt(partition.row_tombstones().pop_front_and_lock()); + + position_in_partition::less_compare cmp_less(*_ptr->_schema); + + while (!stop && (cr || rt)) { + bool emit_rt; + if (rt && cr) { + emit_rt = cmp_less(rt->position(), cr->position()); + } else { + emit_rt = bool(rt); + } + if (emit_rt) { + stop = consumer.consume(std::move(*rt)); + rt.reset(partition.row_tombstones().pop_front_and_lock()); + } else { + stop = consumer.consume(clustering_row(std::move(*cr))); + cr.reset(partition.clustered_rows().unlink_leftmost_without_rebalance()); + } + } + while (cr) { + cr.reset(partition.clustered_rows().unlink_leftmost_without_rebalance()); + } + while (rt) { + rt.reset(partition.row_tombstones().pop_front_and_lock()); + } + + const auto stop_consuming = consumer.consume_end_of_partition(); + using consume_res_type = decltype(consumer.consume_end_of_stream()); + if constexpr (std::is_same_v) { + consumer.consume_end_of_stream(); + return mutation_consume_result{stop_consuming}; + } else { + return mutation_consume_result{stop_consuming, consumer.consume_end_of_stream()}; + } +} + struct mutation_equals_by_key { bool operator()(const mutation& m1, const mutation& m2) const { return m1.schema() == m2.schema() From 164582f33b8002118e78b1809a19a9814f84fdbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 13 Jan 2021 13:09:13 +0200 Subject: [PATCH 4/9] mutation_query: move to_data_query_result() to mutation_partition.cc We want to rewrite the above mentioned method's implementation in terms of the standard query result building code (that of the `data_query()` path), in order to retire the alternative query code in the mutation class. The `data_query()` code uses classes private to `mutation_partition.cc` and instead of making these public, just move `to_data_query_result()` to `mutation_partition.cc`. --- mutation_partition.cc | 17 +++++++++++++++++ mutation_query.cc | 17 ----------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/mutation_partition.cc b/mutation_partition.cc index dcec5770c0..7f265cab3d 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2359,6 +2359,23 @@ reconcilable_result reconcilable_result_builder::consume_end_of_stream() { std::move(_memory_accounter).done()); } +query::result +to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint64_t max_rows, uint32_t max_partitions, query::result_options opts) { + // This result was already built with a limit, don't apply another one. + query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); + for (const partition& p : r.partitions()) { + if (builder.row_count() >= max_rows || builder.partition_count() >= max_partitions) { + break; + } + // Also enforces the per-partition limit. + p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), max_rows - builder.row_count()); + } + if (r.is_short_read()) { + builder.mark_as_short_read(); + } + return builder.build(); +} + future static do_mutation_query(schema_ptr s, mutation_source source, diff --git a/mutation_query.cc b/mutation_query.cc index e4e98567a7..90e6fa0430 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -63,23 +63,6 @@ bool reconcilable_result::operator!=(const reconcilable_result& other) const { return !(*this == other); } -query::result -to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint64_t max_rows, uint32_t max_partitions, query::result_options opts) { - // This result was already built with a limit, don't apply another one. - query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - for (const partition& p : r.partitions()) { - if (builder.row_count() >= max_rows || builder.partition_count() >= max_partitions) { - break; - } - // Also enforces the per-partition limit. - p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), max_rows - builder.row_count()); - } - if (r.is_short_read()) { - builder.mark_as_short_read(); - } - return builder.build(); -} - std::ostream& operator<<(std::ostream& out, const reconcilable_result::printer& pr) { out << "{rows=" << pr.self.row_count() << ", short_read=" << pr.self.is_short_read() << ", ["; From c4f12221b80803b37d38d83a819943de3b769e25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 15 Jan 2021 09:39:42 +0200 Subject: [PATCH 5/9] mutation_query: to_data_query_result(): migrate to standard query code Reimplement in terms of the standard query result building code. We want to retire the alternative query result code in `mutation::query()` and `to_data_query_result()` is one of the main users. --- mutation_partition.cc | 11 +++++++---- mutation_query.hh | 8 +++++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/mutation_partition.cc b/mutation_partition.cc index 7f265cab3d..669b16ff2f 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2360,15 +2360,18 @@ reconcilable_result reconcilable_result_builder::consume_end_of_stream() { } query::result -to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint64_t max_rows, uint32_t max_partitions, query::result_options opts) { +to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint64_t max_rows, uint32_t max_partitions, + query::result_options opts) { // This result was already built with a limit, don't apply another one. query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); + auto consumer = compact_for_query(*s, gc_clock::time_point::min(), slice, max_rows, + max_partitions, query_result_builder(*s, builder)); + for (const partition& p : r.partitions()) { - if (builder.row_count() >= max_rows || builder.partition_count() >= max_partitions) { + const auto res = p.mut().unfreeze(s).consume(consumer); + if (res.stop == stop_iteration::yes) { break; } - // Also enforces the per-partition limit. - p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), max_rows - builder.row_count()); } if (r.is_short_read()) { builder.mark_as_short_read(); diff --git a/mutation_query.hh b/mutation_query.hh index d1cdc33dfd..b1db159870 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -166,7 +166,13 @@ public: reconcilable_result consume_end_of_stream(); }; -query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint64_t row_limit, uint32_t partition_limit, query::result_options opts = query::result_options::only_result()); +query::result to_data_query_result( + const reconcilable_result&, + schema_ptr, + const query::partition_slice&, + uint64_t row_limit, + uint32_t partition_limit, + query::result_options opts = query::result_options::only_result()); // Performs a query on given data source returning data in reconcilable form. // From 821ed96e0ea82e62b151b9618d2b6ef6f46e3c3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 15 Jan 2021 11:49:47 +0200 Subject: [PATCH 6/9] mutation_query: introduce query_mutation() This is a replacement of `mutation::query()`, but with an implementation based on the standard query result building code. This will allow us to migrate the remaining `mutation::query()` users off of said method, which in turn will allow us to retire it finally. --- mutation_partition.cc | 9 +++++++++ mutation_query.hh | 10 ++++++++++ 2 files changed, 19 insertions(+) diff --git a/mutation_partition.cc b/mutation_partition.cc index 669b16ff2f..6edf60d548 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2379,6 +2379,15 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa return builder.build(); } +query::result +query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) { + query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); + auto consumer = compact_for_query(*m.schema(), now, slice, row_limit, + query::max_partitions, query_result_builder(*m.schema(), builder)); + std::move(m).consume(consumer); + return builder.build(); +} + future static do_mutation_query(schema_ptr s, mutation_source source, diff --git a/mutation_query.hh b/mutation_query.hh index b1db159870..b32ca06271 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -174,6 +174,16 @@ query::result to_data_query_result( uint32_t partition_limit, query::result_options opts = query::result_options::only_result()); +// Query the content of the mutation. +// +// The mutation is destroyed in the process, see `mutation::consume()`. +query::result query_mutation( + mutation&& m, + const query::partition_slice& slice, + uint64_t row_limit = query::max_rows, + gc_clock::time_point now = gc_clock::now(), + query::result_options opts = query::result_options::only_result()); + // Performs a query on given data source returning data in reconcilable form. // // Reads at most row_limit rows. If less rows are returned, the data source From a9d726c7ba684c03e8b353ff06fba375b3e57967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 18 Jan 2021 14:02:00 +0200 Subject: [PATCH 7/9] mutation_test: test_query_digest: ensure digest is produced consistently Before we retire the mutation::query() code, expand the digest test to check that the new code replacing it produces identical digest on all possible equivalent mutations. --- test/boost/mutation_test.cc | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 5cb562f1a5..a9e6f0b751 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -1236,12 +1236,26 @@ SEASTAR_TEST_CASE(test_query_digest) { auto check_digests_equal = [] (const mutation& m1, const mutation& m2) { auto ps1 = partition_slice_builder(*m1.schema()).build(); auto ps2 = partition_slice_builder(*m2.schema()).build(); - auto digest1 = *m1.query(ps1, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, + auto digest1_old = *m1.query(ps1, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); - auto digest2 = *m2.query(ps2, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, + auto digest1_new = *query_mutation(mutation(m1), ps1, query::max_rows, gc_clock::now(), query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); - if (digest1 != digest2) { - BOOST_FAIL(format("Digest should be the same for {} and {}", m1, m2)); + auto digest2_old = *m2.query(ps2, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, + query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); + auto digest2_new = *query_mutation(mutation(m2), ps2, query::max_rows, gc_clock::now(), + query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); + + if (digest1_old != digest1_new) { + BOOST_FAIL(format("Digest should be the same with old and new method for {}", m1)); + } + if (digest2_old != digest2_new) { + BOOST_FAIL(format("Digest should be the same with old and new method for {}", m2)); + } + if (digest1_old != digest2_old) { + BOOST_FAIL(format("Digest (old) should be the same for {} and {}", m1, m2)); + } + if (digest1_new != digest2_new) { + BOOST_FAIL(format("Digest (new) should be the same for {} and {}", m1, m2)); } }; From 1a3ee71b39ad1c91c155d8cdfb44324e8f9ef184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 15 Jan 2021 11:49:11 +0200 Subject: [PATCH 8/9] treewide: use query_mutations() instead of mutation::query() We want to retire the latter. --- query-result-set.cc | 3 ++- test/boost/mutation_test.cc | 37 ++++++++---------------------- test/perf/memory_footprint_test.cc | 3 +-- 3 files changed, 13 insertions(+), 30 deletions(-) diff --git a/query-result-set.cc b/query-result-set.cc index e221f14ef7..15d61d015f 100644 --- a/query-result-set.cc +++ b/query-result-set.cc @@ -25,6 +25,7 @@ #include "mutation.hh" #include "types/map.hh" #include "utils/exceptions.hh" +#include "mutation_query.hh" #include @@ -221,7 +222,7 @@ result_set::from_raw_result(schema_ptr s, const partition_slice& slice, const re result_set::result_set(const mutation& m) : result_set([&m] { auto slice = partition_slice_builder(*m.schema()).build(); - auto qr = mutation(m).query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, result_options::only_result()); + auto qr = query_mutation(mutation(m), slice); return result_set::from_raw_result(m.schema(), slice, qr); }()) { } diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index a9e6f0b751..663f8f9146 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -770,8 +770,7 @@ SEASTAR_TEST_CASE(test_querying_of_mutation) { auto resultify = [s] (const mutation& m) -> query::result_set { auto slice = make_full_slice(*s); - return query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size })); + return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice)); }; mutation m(s, partition_key::from_single_value(*s, "key1")); @@ -806,8 +805,7 @@ SEASTAR_TEST_CASE(test_partition_with_no_live_data_is_absent_in_data_query_resul auto slice = make_full_slice(*s); - assert_that(query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }))) + assert_that(query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice))) .is_empty(); }); } @@ -831,7 +829,7 @@ SEASTAR_TEST_CASE(test_partition_with_live_data_in_static_row_is_present_in_the_ .build(); assert_that(query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }))) + query_mutation(mutation(m), slice))) .has_only(a_row() .with_column("pk", data_value(bytes("key1"))) .with_column("v", data_value::make_null(bytes_type))); @@ -855,7 +853,7 @@ SEASTAR_TEST_CASE(test_query_result_with_one_regular_column_missing) { auto slice = partition_slice_builder(*s).build(); assert_that(query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }))) + query_mutation(mutation(m), slice))) .has_only(a_row() .with_column("pk", data_value(bytes("key1"))) .with_column("ck", data_value(bytes("ck:A"))) @@ -1236,26 +1234,13 @@ SEASTAR_TEST_CASE(test_query_digest) { auto check_digests_equal = [] (const mutation& m1, const mutation& m2) { auto ps1 = partition_slice_builder(*m1.schema()).build(); auto ps2 = partition_slice_builder(*m2.schema()).build(); - auto digest1_old = *m1.query(ps1, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, + auto digest1 = *query_mutation(mutation(m1), ps1, query::max_rows, gc_clock::now(), query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); - auto digest1_new = *query_mutation(mutation(m1), ps1, query::max_rows, gc_clock::now(), - query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); - auto digest2_old = *m2.query(ps2, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, - query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); - auto digest2_new = *query_mutation(mutation(m2), ps2, query::max_rows, gc_clock::now(), + auto digest2 = *query_mutation( mutation(m2), ps2, query::max_rows, gc_clock::now(), query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); - if (digest1_old != digest1_new) { - BOOST_FAIL(format("Digest should be the same with old and new method for {}", m1)); - } - if (digest2_old != digest2_new) { - BOOST_FAIL(format("Digest should be the same with old and new method for {}", m2)); - } - if (digest1_old != digest2_old) { - BOOST_FAIL(format("Digest (old) should be the same for {} and {}", m1, m2)); - } - if (digest1_new != digest2_new) { - BOOST_FAIL(format("Digest (new) should be the same for {} and {}", m1, m2)); + if (digest1 != digest2) { + BOOST_FAIL(format("Digest should be the same for {} and {}", m1, m2)); } }; @@ -1502,8 +1487,7 @@ SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) { .without_partition_key_columns() .build(); auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}; - return query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, opts, t)); + return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice, query::max_rows, t, opts)); }; mutation m(s, pk); @@ -1567,8 +1551,7 @@ SEASTAR_TEST_CASE(test_querying_expired_cells) { .without_partition_key_columns() .build(); auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}; - return query::result_set::from_raw_result(s, slice, - m.query(slice, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, opts, t)); + return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice, query::max_rows, t, opts)); }; { diff --git a/test/perf/memory_footprint_test.cc b/test/perf/memory_footprint_test.cc index 92cba64ae2..8e48526b13 100644 --- a/test/perf/memory_footprint_test.cc +++ b/test/perf/memory_footprint_test.cc @@ -196,8 +196,7 @@ static sizes calculate_sizes(cache_tracker& tracker, const mutation_settings& se result.cache = tracker.region().occupancy().used_space() - cache_initial_occupancy; result.frozen = freeze(m).representation().size(); result.canonical = canonical_mutation(m).representation().size(); - result.query_result = m.query(partition_slice_builder(*s).build(), - query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::result_options::only_result()).buf().size(); + result.query_result = query_mutation(mutation(m), partition_slice_builder(*s).build()).buf().size(); tmpdir sstable_dir; sstables::test_env::do_with_async([&] (sstables::test_env& env) { From 9c96d74b7240fd9087792510d7c054594a5adeea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 18 Jan 2021 14:25:11 +0200 Subject: [PATCH 9/9] mutation: remove now unused query() and query_compacted() --- mutation.cc | 35 ----------------- mutation.hh | 21 ----------- mutation_partition.cc | 88 ------------------------------------------- mutation_partition.hh | 5 --- 4 files changed, 149 deletions(-) diff --git a/mutation.cc b/mutation.cc index 4dc67e5e76..e025c67d5d 100644 --- a/mutation.cc +++ b/mutation.cc @@ -105,41 +105,6 @@ bool mutation::operator!=(const mutation& m) const { return !(*this == m); } -void -mutation::query(query::result::builder& builder, - const query::partition_slice& slice, - gc_clock::time_point now, - uint64_t row_limit) && -{ - auto pb = builder.add_partition(*schema(), key()); - auto is_reversed = slice.options.contains(); - auto always_return_static_content = slice.options.contains(); - mutation_partition& p = partition(); - auto limit = std::min(row_limit, slice.partition_row_limit()); - p.compact_for_query(*schema(), now, slice.row_ranges(*schema(), key()), always_return_static_content, is_reversed, limit); - p.query_compacted(pb, *schema(), limit); -} - -query::result -mutation::query(const query::partition_slice& slice, - query::result_memory_accounter&& accounter, - query::result_options opts, - gc_clock::time_point now, uint64_t row_limit) && -{ - query::result::builder builder(slice, opts, std::move(accounter)); - std::move(*this).query(builder, slice, now, row_limit); - return builder.build(); -} - -query::result -mutation::query(const query::partition_slice& slice, - query::result_memory_accounter&& accounter, - query::result_options opts, - gc_clock::time_point now, uint64_t row_limit) const& -{ - return mutation(*this).query(slice, std::move(accounter), opts, now, row_limit); -} - uint64_t mutation::live_row_count(gc_clock::time_point query_time) const { return partition().live_row_count(*schema(), query_time); diff --git a/mutation.hh b/mutation.hh index e3436d8d71..0862ffdd69 100644 --- a/mutation.hh +++ b/mutation.hh @@ -119,27 +119,6 @@ public: bool operator==(const mutation&) const; bool operator!=(const mutation&) const; public: - // The supplied partition_slice must be governed by this mutation's schema - query::result query(const query::partition_slice&, - query::result_memory_accounter&& accounter, - query::result_options opts = query::result_options::only_result(), - gc_clock::time_point now = gc_clock::now(), - uint64_t row_limit = query::max_rows) &&; - - // The supplied partition_slice must be governed by this mutation's schema - // FIXME: Slower than the r-value version - query::result query(const query::partition_slice&, - query::result_memory_accounter&& accounter, - query::result_options opts = query::result_options::only_result(), - gc_clock::time_point now = gc_clock::now(), - uint64_t row_limit = query::max_rows) const&; - - // The supplied partition_slice must be governed by this mutation's schema - void query(query::result::builder& builder, - const query::partition_slice& slice, - gc_clock::time_point now = gc_clock::now(), - uint64_t row_limit = query::max_rows) &&; - // Consumes the mutation's content. // // The mutation is in a moved-from alike state after consumption. diff --git a/mutation_partition.cc b/mutation_partition.cc index 6edf60d548..bc9de2188e 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -908,94 +908,6 @@ bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tomb return any_live; } -void -mutation_partition::query_compacted(query::result::partition_writer& pw, const schema& s, uint64_t limit) const { - check_schema(s); - const query::partition_slice& slice = pw.slice(); - max_timestamp max_ts{pw.last_modified()}; - - if (limit == 0) { - pw.retract(); - return; - } - - auto static_cells_wr = pw.start().start_static_row().start_cells(); - - if (!slice.static_columns.empty()) { - if (pw.requested_result()) { - get_compacted_row_slice(s, slice, column_kind::static_column, static_row().get(), slice.static_columns, static_cells_wr); - } - if (pw.requested_digest()) { - auto pt = partition_tombstone(); - pw.digest().feed_hash(pt); - max_ts.update(pt.timestamp); - pw.digest().feed_hash(static_row().get(), s, column_kind::static_column, slice.static_columns, max_ts); - } - } - - auto rows_wr = std::move(static_cells_wr).end_cells() - .end_static_row() - .start_rows(); - - uint64_t row_count = 0; - - auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); - auto send_ck = slice.options.contains(query::partition_slice::option::send_clustering_key); - for_each_row(s, query::clustering_range::make_open_ended_both_sides(), is_reversed, [&] (const rows_entry& e) { - if (e.dummy()) { - return stop_iteration::no; - } - auto& row = e.row(); - auto row_tombstone = tombstone_for_row(s, e); - - if (pw.requested_digest()) { - pw.digest().feed_hash(e.key(), s); - pw.digest().feed_hash(row_tombstone); - max_ts.update(row_tombstone.tomb().timestamp); - pw.digest().feed_hash(row.cells(), s, column_kind::regular_column, slice.regular_columns, max_ts); - } - - if (row.is_live(s)) { - if (pw.requested_result()) { - auto cells_wr = [&] { - if (send_ck) { - return rows_wr.add().write_key(e.key()).start_cells().start_cells(); - } else { - return rows_wr.add().skip_key().start_cells().start_cells(); - } - }(); - get_compacted_row_slice(s, slice, column_kind::regular_column, row.cells(), slice.regular_columns, cells_wr); - std::move(cells_wr).end_cells().end_cells().end_qr_clustered_row(); - } - ++row_count; - if (--limit == 0) { - return stop_iteration::yes; - } - } - return stop_iteration::no; - }); - - pw.last_modified() = max_ts.max; - - // If we got no rows, but have live static columns, we should only - // give them back IFF we did not have any CK restrictions. - // #589 - // If ck:s exist, and we do a restriction on them, we either have maching - // rows, or return nothing, since cql does not allow "is null". - bool return_static_content_on_partition_with_no_rows = - pw.slice().options.contains(query::partition_slice::option::always_return_static_content) || - !has_ck_selector(pw.ranges()); - if (row_count == 0 - && (!return_static_content_on_partition_with_no_rows - || !has_any_live_data(s, column_kind::static_column, static_row().get()))) { - pw.retract(); - } else { - pw.row_count() += row_count ? : 1; - pw.partition_count() += 1; - std::move(rows_wr).end_rows().end_qr_partition(); - } -} - std::ostream& operator<<(std::ostream& os, const std::pair& c) { return fmt_print(os, "{{column: {} {}}}", c.first, c.second); diff --git a/mutation_partition.hh b/mutation_partition.hh index 88e3e0af48..bb0ef14c78 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -1479,11 +1479,6 @@ public: return boost::make_iterator_range(_rows.begin(), _rows.end()) | boost::adaptors::filtered([] (const rows_entry& e) { return bool(!e.dummy()); }); } - // Writes this partition using supplied query result writer. - // The partition should be first compacted with compact_for_query(), otherwise - // results may include data which is deleted/expired. - // At most row_limit CQL rows will be written and digested. - void query_compacted(query::result::partition_writer& pw, const schema& s, uint64_t row_limit) const; void accept(const schema&, mutation_partition_visitor&) const; // Returns the number of live CQL rows in this partition.