From 0c24c18d0c0d48e428957bdc68cccfd284083793 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 15 Sep 2021 12:43:25 +0200 Subject: [PATCH 1/9] test: cql_query_test: fix test_query_limit for reversed queries (Single-partition) reversed queries are no longer unlimited but some places still treat them as such. This causes, for example, shorter pages for such queries, which breaks a test that expects certain results to come in a single page. --- service/storage_proxy.cc | 5 +++ test/boost/cql_query_test.cc | 59 +++++++++++++++++++++++++++++++----- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 38e4a27b9d..f58cf99d9f 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1328,6 +1328,11 @@ endpoints_to_replica_ids(const locator::token_metadata& tm, const inet_address_v query::max_result_size storage_proxy::get_max_result_size(const query::partition_slice& slice) const { // Unpaged and reverse queries. + // FIXME: some reversed queries are no longer unlimited. + // To filter these out we need to know if it's a single partition query. + // Take the partition key range as a parameter? This would require changing many call sites + // where the partition key range is not immediately available. + // For now we ignore the issue, return the 'wrong' limit for some reversed queries, and wait until all reversed queries are fixed. if (!slice.options.contains() || slice.options.contains()) { return _db.local().get_unlimited_query_max_result_size(); } else { diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 2eed88f3bc..b0ba127ace 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4585,6 +4585,29 @@ SEASTAR_TEST_CASE(test_impossible_where) { }); } +// FIXME: copy-pasta +static bool has_more_pages(::shared_ptr res) { + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE(rows); + return rows->rs().get_metadata().flags().contains(cql3::metadata::flag::HAS_MORE_PAGES); +}; + +static size_t count_rows_fetched(::shared_ptr res) { + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE(rows); + return rows->rs().result_set().size(); +}; + +static lw_shared_ptr extract_paging_state(::shared_ptr res) { + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE(rows); + auto paging_state = rows->rs().get_metadata().paging_state(); + if (!paging_state) { + return nullptr; + } + return make_lw_shared(*paging_state); +}; + SEASTAR_THREAD_TEST_CASE(test_query_limit) { cql_test_config cfg; @@ -4629,18 +4652,38 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) { const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC"); int32_t page_size = is_paged ? 10000 : -1; - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, - cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()}); - const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows; + const auto& expected_rows = is_reversed ? reversed_rows : normal_rows; try { - auto result = with_scheduling_group(scheduling_group, [&e] (const sstring& q, std::unique_ptr qo) { - return e.execute_cql(q, std::move(qo)); - }, select_query, std::move(qo)).get0(); - assert_that(std::move(result)) + bool has_more_pages = true; + lw_shared_ptr paging_state = nullptr; + size_t next_expected_row_idx = 0; + while (has_more_pages) { + // FIXME: even though we chose a large page size, for reversed queries we may still obtain multiple pages. + // This happens because the query result size limit for reversed queries is set to the 'unlimited query hard limit' + // (even though single-partition reversed queries are no longer 'unlimited') which we set to a low value, + // causing reversed queries to finish early. + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, + cql3::query_options::specific_options{page_size, paging_state, {}, api::new_timestamp()}); + auto result = with_scheduling_group(scheduling_group, [&e] (const sstring& q, std::unique_ptr qo) { + return e.execute_cql(q, std::move(qo)); + }, select_query, std::move(qo)).get0(); + + auto rows_fetched = count_rows_fetched(result); + BOOST_REQUIRE(next_expected_row_idx + rows_fetched <= expected_rows.size()); + assert_that(result) .is_rows() - .with_rows(*expected_rows); + .with_rows(std::vector>{ + expected_rows.begin() + next_expected_row_idx, + expected_rows.begin() + next_expected_row_idx + rows_fetched}); + + has_more_pages = ::has_more_pages(result); + paging_state = extract_paging_state(result); + BOOST_REQUIRE(!has_more_pages || paging_state); + next_expected_row_idx += rows_fetched; + } + BOOST_REQUIRE(next_expected_row_idx == expected_rows.size()); if (should_fail) { BOOST_FAIL("Expected exception, but none was thrown."); From dac2509a7fe22eb266df92440ddc787dae968718 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Radwa=C5=84ski?= Date: Wed, 15 Sep 2021 17:32:56 +0200 Subject: [PATCH 2/9] query: reverse clustering_range --- query-request.hh | 10 ++++++++++ query.cc | 13 ++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/query-request.hh b/query-request.hh index 555d249197..995f15ddfc 100644 --- a/query-request.hh +++ b/query-request.hh @@ -45,6 +45,16 @@ using range = wrapping_range; using ring_position = dht::ring_position; using clustering_range = nonwrapping_range; +// If `range` was supposed to be used with a comparator `cmp`, then +// `reverse(range)` is supposed to be used with a reversed comparator `c`. +// For instance, if it does make sense to do +// range.contains(point, cmp); +// then it also makes sense to do +// reversed(range).contains(point, [](auto x, auto y) { return cmp(y, x); }); +// but it doesn't make sense to do +// reversed(range).contains(point, cmp); +clustering_range reverse(const clustering_range& range); + extern const dht::partition_range full_partition_range; extern const clustering_range full_clustering_range; diff --git a/query.cc b/query.cc index ec12a6239a..f8cdb23cad 100644 --- a/query.cc +++ b/query.cc @@ -115,11 +115,18 @@ void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& range reversed ? position_in_partition_view::after_key(full_key) : position_in_partition_view::before_key(full_key), reversed); } + +clustering_range reverse(const clustering_range& range) { + if (range.is_singular()) { + return range; + } + return clustering_range(range.end(), range.start()); +} + + static void reverse_clustering_ranges_bounds(clustering_row_ranges& ranges) { for (auto& range : ranges) { - if (!range.is_singular()) { - range = query::clustering_range(range.end(), range.start()); - } + range = reverse(range); } } From fc51d2cc8c2b7ba1862abc39781753a8cfba743a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Radwa=C5=84ski?= Date: Tue, 21 Sep 2021 15:41:14 +0200 Subject: [PATCH 3/9] partition_snapshot_reader: separate _schema into _query_schema and _partition_schema After memtable starts supporting reverse order queries, the schema provided to the readers will be reversed (reverse clustering order). Reading from memtable in reverse requires two schemas - one to access the memtable internal data structures (_partition_schema), and the other one (_query_schema), the schema imposing clustering order on returned mutation fragments. This commit prepares for introduction of native reverse queries for memtable, by separating these responsibilities. For now, they are still initialized with the schema passed from query. --- partition_snapshot_reader.hh | 38 ++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 52d1d89614..666447e3c8 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -50,7 +50,14 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public // snapshot, references to region and alloc section) or dropped on any // allocation section retry (_clustering_rows). class lsa_partition_reader { - const schema& _schema; + // _query_schema can be used to retrieve the clustering key order which is used + // for result ordering. This schema is passed from the query and is reversed iff + // the query was reversed (i.e. `Reversing==true`). + const schema& _query_schema; + // _snapshot_schema is a schema that induces the same clustering key order as the + // schema from the underlying snapshot. The schemas mentioned might differ, for + // instance, if a query used newer version of the schema. + const schema_ptr _snapshot_schema; reader_permit _permit; heap_compare _heap_cmp; @@ -89,14 +96,14 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public _clustering_rows.clear(); _range_tombstones.clear(); - rows_entry::tri_compare rows_cmp(_schema); + rows_entry::tri_compare rows_cmp(*_snapshot_schema); for (auto&& v : _snapshot->versions()) { - mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(_schema, ck_range); + mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(*_snapshot_schema, ck_range); auto cr = [&] () -> mutation_partition::rows_type::const_iterator { if (last_row) { return v.partition().clustered_rows().upper_bound(*last_row, rows_cmp); } else { - return v.partition().lower_bound(_schema, ck_range); + return v.partition().lower_bound(*_snapshot_schema, ck_range); } }(); @@ -106,9 +113,9 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public range_tombstone_list::iterator_range rt_slice = [&] () { if (last_rts) { - return v.partition().row_tombstones().upper_slice(_schema, *last_rts, bound_view::from_range_end(ck_range)); + return v.partition().row_tombstones().upper_slice(*_snapshot_schema, *last_rts, bound_view::from_range_end(ck_range)); } else { - return v.partition().row_tombstones().slice(_schema, ck_range); + return v.partition().row_tombstones().slice(*_snapshot_schema, ck_range); } }(); if (rt_slice.begin() != rt_slice.end()) { @@ -164,7 +171,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public explicit lsa_partition_reader(const schema& s, reader_permit permit, partition_snapshot_ptr snp, logalloc::region& region, logalloc::allocating_section& read_section, bool digest_requested) - : _schema(s) + : _query_schema(s) + , _snapshot_schema(s.shared_from_this()) , _permit(permit) , _heap_cmp(s) , _snapshot(std::move(snp)) @@ -207,23 +215,23 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return in_alloc_section([&] () -> mutation_fragment_opt { maybe_refresh_state(ck_range, last_row, last_rts); - position_in_partition::equal_compare rows_eq(_schema); + position_in_partition::equal_compare rows_eq(_query_schema); while (has_more_rows()) { const rows_entry& e = pop_clustering_row(); if (e.dummy()) { continue; } if (_digest_requested) { - e.row().cells().prepare_hash(_schema, column_kind::regular_column); + e.row().cells().prepare_hash(_query_schema, column_kind::regular_column); } - auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _schema, _permit, _schema, e); + auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _query_schema, _permit, _query_schema, e); while (has_more_rows() && rows_eq(peek_row().position(), result.as_clustering_row().position())) { const rows_entry& e = pop_clustering_row(); if (_digest_requested) { - e.row().cells().prepare_hash(_schema, column_kind::regular_column); + e.row().cells().prepare_hash(_query_schema, column_kind::regular_column); } - result.mutate_as_clustering_row(_schema, [&] (clustering_row& cr) mutable { - cr.apply(_schema, e); + result.mutate_as_clustering_row(_query_schema, [&] (clustering_row& cr) mutable { + cr.apply(_query_schema, e); }); } return result; @@ -242,12 +250,12 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return in_alloc_section([&] () -> mutation_fragment_opt { maybe_refresh_state(ck_range, last_row, last_rts); - position_in_partition::less_compare rt_less(_schema); + position_in_partition::less_compare rt_less(_query_schema); while (has_more_range_tombstones() && !rt_less(pos, peek_range_tombstone().position()) && (_rt_stream.empty() || !rt_less(_rt_stream.peek_next().position(), peek_range_tombstone().position()))) { range_tombstone rt = pop_range_tombstone(); - if (rt.trim(_schema, + if (rt.trim(_query_schema, position_in_partition_view::for_range_start(ck_range), position_in_partition_view::for_range_end(ck_range))) { _rt_stream.apply(std::move(rt)); From a672b8b86f2fbb2fc1a6be5d35b3f4baaa3f8a3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Radwa=C5=84ski?= Date: Tue, 14 Sep 2021 15:29:50 +0200 Subject: [PATCH 4/9] partition_snapshot_reader: split responsibility of ck_range Previously, next_range_tombstone took as an argument a clustering key range, which served two purposes. One was for accesing only specified key ranges from the partition, the other was for deciding in which order the mutation fragments should be emitted. This commits separates these responsibilities, since in the advent of native memtable reader, these two responsibilities are no longer common. The split is propagated to the rest of the partition_snapshot_reader.hh to avoid confusion. --- partition_snapshot_reader.hh | 52 +++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 666447e3c8..a66b11b005 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -24,6 +24,7 @@ #include "partition_version.hh" #include "flat_mutation_reader.hh" #include "clustering_key_filter.hh" +#include "query-request.hh" #include template @@ -80,17 +81,17 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return fn(); }); } - void maybe_refresh_state(const query::clustering_range& ck_range, + void maybe_refresh_state(const query::clustering_range& ck_range_snapshot, const std::optional& last_row, const std::optional& last_rts) { auto mark = _snapshot->get_change_mark(); if (mark != _change_mark) { - do_refresh_state(ck_range, last_row, last_rts); + do_refresh_state(ck_range_snapshot, last_row, last_rts); _change_mark = mark; } } - void do_refresh_state(const query::clustering_range& ck_range, + void do_refresh_state(const query::clustering_range& ck_range_snapshot, const std::optional& last_row, const std::optional& last_rts) { _clustering_rows.clear(); @@ -98,12 +99,12 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public rows_entry::tri_compare rows_cmp(*_snapshot_schema); for (auto&& v : _snapshot->versions()) { - mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(*_snapshot_schema, ck_range); + mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(*_snapshot_schema, ck_range_snapshot); auto cr = [&] () -> mutation_partition::rows_type::const_iterator { if (last_row) { return v.partition().clustered_rows().upper_bound(*last_row, rows_cmp); } else { - return v.partition().lower_bound(*_snapshot_schema, ck_range); + return v.partition().lower_bound(*_snapshot_schema, ck_range_snapshot); } }(); @@ -113,9 +114,9 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public range_tombstone_list::iterator_range rt_slice = [&] () { if (last_rts) { - return v.partition().row_tombstones().upper_slice(*_snapshot_schema, *last_rts, bound_view::from_range_end(ck_range)); + return v.partition().row_tombstones().upper_slice(*_snapshot_schema, *last_rts, bound_view::from_range_end(ck_range_snapshot)); } else { - return v.partition().row_tombstones().slice(*_snapshot_schema, ck_range); + return v.partition().row_tombstones().slice(*_snapshot_schema, ck_range_snapshot); } }(); if (rt_slice.begin() != rt_slice.end()) { @@ -182,8 +183,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public , _digest_requested(digest_requested) { } - void reset_state(const query::clustering_range& ck_range) { - do_refresh_state(ck_range, {}, {}); + void reset_state(const query::clustering_range& ck_range_snapshot) { + do_refresh_state(ck_range_snapshot, {}, {}); } template @@ -203,17 +204,17 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public } // Returns next clustered row in the range. - // If the ck_range is the same as the one used previously last_row needs + // If the ck_range_snapshot is the same as the one used previously last_row needs // to be engaged and equal the position of the row returned last time. - // If the ck_range is different or this is the first call to this + // If the ck_range_snapshot is different or this is the first call to this // function last_row has to be disengaged. Additionally, when entering // new range _rt_stream will be populated with all relevant // tombstones. - mutation_fragment_opt next_row(const query::clustering_range& ck_range, + mutation_fragment_opt next_row(const query::clustering_range& ck_range_snapshot, const std::optional& last_row, const std::optional& last_rts) { return in_alloc_section([&] () -> mutation_fragment_opt { - maybe_refresh_state(ck_range, last_row, last_rts); + maybe_refresh_state(ck_range_snapshot, last_row, last_rts); position_in_partition::equal_compare rows_eq(_query_schema); while (has_more_rows()) { @@ -240,7 +241,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public }); } - mutation_fragment_opt next_range_tombstone(const query::clustering_range& ck_range, + mutation_fragment_opt next_range_tombstone(const query::clustering_range& ck_range_snapshot, + const query::clustering_range& ck_range_query, const std::optional& last_row, const std::optional& last_rts, position_in_partition_view pos) { @@ -248,16 +250,18 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return _rt_stream.get_next(std::move(pos)); } return in_alloc_section([&] () -> mutation_fragment_opt { - maybe_refresh_state(ck_range, last_row, last_rts); + maybe_refresh_state(ck_range_snapshot, last_row, last_rts); position_in_partition::less_compare rt_less(_query_schema); + while (has_more_range_tombstones() && !rt_less(pos, peek_range_tombstone().position()) && (_rt_stream.empty() || !rt_less(_rt_stream.peek_next().position(), peek_range_tombstone().position()))) { range_tombstone rt = pop_range_tombstone(); + if (rt.trim(_query_schema, - position_in_partition_view::for_range_start(ck_range), - position_in_partition_view::for_range_end(ck_range))) { + position_in_partition_view::for_range_start(ck_range_query), + position_in_partition_view::for_range_end(ck_range_query))) { _rt_stream.apply(std::move(rt)); } } @@ -271,6 +275,9 @@ private: // that its lifetime is appropriately extended. boost::any _container_guard; + // Each range from _ck_ranges are taken to be in snapshot clustering key + // order, i.e. given a comparator derived from snapshot schema, for each ck_range from + // _ck_ranges, begin(ck_range) <= end(ck_range). query::clustering_key_filter_ranges _ck_ranges; query::clustering_row_ranges::const_iterator _current_ck_range; query::clustering_row_ranges::const_iterator _ck_range_end; @@ -298,11 +305,18 @@ private: if (!_next_row && !_no_more_rows_in_current_range) { _next_row = _reader.next_row(*_current_ck_range, _last_entry, _last_rts); } + + // We use the names ck_range_snapshot and ck_range_query to denote clustering order. + // ck_range_snapshot uses the snapshot order, while ck_range_query uses the + // query order. These two differ if the query was reversed (`Reversing==true`). + const auto& ck_range_snapshot = *_current_ck_range; + const auto& ck_range_query = *_current_ck_range; + if (_next_row) { auto pos_view = _next_row->as_clustering_row().position(); _last_entry = position_in_partition(pos_view); - auto mf = _reader.next_range_tombstone(*_current_ck_range, _last_entry, _last_rts, pos_view); + auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, pos_view); if (mf) { _last_rts = mf->as_range_tombstone().position(); return mf; @@ -310,7 +324,7 @@ private: return std::exchange(_next_row, {}); } else { _no_more_rows_in_current_range = true; - auto mf = _reader.next_range_tombstone(*_current_ck_range, _last_entry, _last_rts, position_in_partition_view::for_range_end(*_current_ck_range)); + auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, position_in_partition_view::for_range_end(ck_range_query)); if (mf) { _last_rts = mf->as_range_tombstone().position(); } From 6813c39927800404f8b886a7cbbe880698f3e6d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Radwa=C5=84ski?= Date: Tue, 21 Sep 2021 20:31:29 +0200 Subject: [PATCH 5/9] partition_snapshot_reader: rows_position and rows_iter_type supporting reverse iteration Iterating in reverse is useful for native reverse memtable reader. --- partition_snapshot_reader.hh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index a66b11b005..1c64307997 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -29,9 +29,11 @@ template class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public Accounter { + using rows_iter_type = std::conditional_t; struct rows_position { - mutation_partition::rows_type::const_iterator _position; - mutation_partition::rows_type::const_iterator _end; + rows_iter_type _position, _end; }; class heap_compare { From 5449982a0bd0e080b78a9b9c5fd01f15f234b7b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Radwa=C5=84ski?= Date: Tue, 14 Sep 2021 17:40:49 +0200 Subject: [PATCH 6/9] memtable, partition_snapshot_reader: read from partition in reverse In this commit, I add the ability to read from partition snapshots in reverse order. Before these changes, a reverse read from memtable has been handled as follows: - A reader higher in the hierarchy of readers performs a read from memtable in the forward order, which is not aware of the intention to read in reverse. - Later, some reader reverses the received mutation fragments. Memtable decides based on options in `slice`, whether to read forward or in reverse. Note that previous commit creates a killswitch which clears the `reverse` option from slice before running the logic of whether to reverse or not. This is due to the fact, that this commit doesn't all the required code changes. The reversing partition snapshot reader maintains two schemas - one that is the reversed schema (called _query_schema) for the output, and the other one (forward one, called _snapshot_schema), which is used to access the memtable tree (which needs to be the same as the schema used to create memtable). The `partition_slice` provided by callers is provided in 'half-reversed' format for reversed queries, where the order of clustering ranges is reversed, but the ranges themselves are not. --- memtable.cc | 42 ++++++++++++++++--- partition_snapshot_reader.hh | 80 ++++++++++++++++++++++++++++-------- 2 files changed, 100 insertions(+), 22 deletions(-) diff --git a/memtable.cc b/memtable.cc index c2088005ba..896b3aa448 100644 --- a/memtable.cc +++ b/memtable.cc @@ -26,6 +26,18 @@ #include "partition_builder.hh" #include "mutation_partition_view.hh" +static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema( + bool is_reversed, + reader_permit permit, + dht::decorated_key dk, + query::clustering_key_filter_ranges crr, + partition_snapshot_ptr snp, + bool digest_requested, + logalloc::region& region, + logalloc::allocating_section& read_section, + boost::any pointer_to_container, + streamed_mutation::forwarding fwd, memtable& memtable); + void memtable::memtable_encoding_stats_collector::update_timestamp(api::timestamp_type ts) { if (ts != api::missing_timestamp) { encoding_stats_collector::update_timestamp(ts); @@ -465,8 +477,8 @@ public: auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), _slice, key_and_snp->first.key()); auto snp_schema = key_and_snp->second->schema(); bool digest_requested = _slice.options.contains(); - auto mpsr = make_partition_snapshot_flat_reader(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr), - std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl()); + bool is_reversed = _slice.options.contains(query::partition_slice::option::reversed); + auto mpsr = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl()); mpsr.upgrade_schema(schema()); _delegate = std::move(mpsr); } else { @@ -582,6 +594,25 @@ public: } }; +static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema( + bool is_reversed, + reader_permit permit, + dht::decorated_key dk, + query::clustering_key_filter_ranges crr, + partition_snapshot_ptr snp, + bool digest_requested, + logalloc::region& region, + logalloc::allocating_section& read_section, + boost::any pointer_to_container, + streamed_mutation::forwarding fwd, memtable& memtable) { + if (is_reversed) { + schema_ptr rev_snp_schema = snp->schema()->make_reversed(); + return make_partition_snapshot_flat_reader(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable); + } else { + return make_partition_snapshot_flat_reader(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable); + } +} + class flush_reader final : public flat_mutation_reader::impl, private iterator_reader { // FIXME: Similarly to scanning_reader we have an underlying // flat_mutation_reader for each partition. This is suboptimal. @@ -618,7 +649,7 @@ private: update_last(key_and_snp->first); auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), schema()->full_slice(), key_and_snp->first.key()); auto snp_schema = key_and_snp->second->schema(); - auto mpsr = make_partition_snapshot_flat_reader(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr), + auto mpsr = make_partition_snapshot_flat_reader(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory); mpsr.upgrade_schema(schema()); _partition_reader = std::move(mpsr); @@ -679,6 +710,7 @@ memtable::do_make_flat_reader(schema_ptr s, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { + bool is_reversed = slice.options.contains(query::partition_slice::option::reversed); if (query::is_single_partition(range) && !fwd_mr) { const query::ring_position& pos = range.start()->value(); auto snp = _read_section(*this, [&] () -> partition_snapshot_ptr { @@ -695,10 +727,8 @@ memtable::do_make_flat_reader(schema_ptr s, } auto dk = pos.as_decorated_key(); auto cr = query::clustering_key_filter_ranges::get_ranges(*s, slice, dk.key()); - auto snp_schema = snp->schema(); bool digest_requested = slice.options.contains(); - auto rd = make_partition_snapshot_flat_reader(snp_schema, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, - *this, _read_section, shared_from_this(), fwd, *this); + auto rd = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _read_section, shared_from_this(), fwd, *this); rd.upgrade_schema(s); return rd; } else { diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 1c64307997..65d0f36706 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -27,7 +27,7 @@ #include "query-request.hh" #include -template +template class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public Accounter { using rows_iter_type = std::conditional_tposition(), a._position->position()); + if constexpr (Reversing) { + // Here `reversed()` doesn't change anything, but keep it for consistency. + return _less(b._position->position().reversed(), a._position->position().reversed()); + } else { + return _less(b._position->position(), a._position->position()); + } } bool operator()(const range_tombstone_list::iterator_range& a, const range_tombstone_list::iterator_range& b) { - return _less(b.front().position(), a.front().position()); + if constexpr (Reversing) { + return _less(b.back().end_position().reversed(), a.back().end_position().reversed()); + } else { + return _less(b.front().position(), a.front().position()); + } } }; @@ -93,6 +111,25 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public } } + // In reversing mode, upper and lower bounds still need to be executed against + // snapshot schema and ck_range, however we need them to search from "opposite" direction. + template + static rows_iter_type lower_bound(const T& t, Args... args) { + if constexpr (Reversing) { + return make_iterator(t.upper_bound(std::forward(args)...)); + } else { + return make_iterator(t.lower_bound(std::forward(args)...)); + } + } + template + static rows_iter_type upper_bound(const T& t, Args... args) { + if constexpr (Reversing) { + return make_iterator(t.lower_bound(std::forward(args)...)); + } else { + return make_iterator(t.upper_bound(std::forward(args)...)); + } + } + void do_refresh_state(const query::clustering_range& ck_range_snapshot, const std::optional& last_row, const std::optional& last_rts) { @@ -101,14 +138,14 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public rows_entry::tri_compare rows_cmp(*_snapshot_schema); for (auto&& v : _snapshot->versions()) { - mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(*_snapshot_schema, ck_range_snapshot); - auto cr = [&] () -> mutation_partition::rows_type::const_iterator { + auto cr = [&] () { if (last_row) { - return v.partition().clustered_rows().upper_bound(*last_row, rows_cmp); + return upper_bound(v.partition().clustered_rows(), *last_row, rows_cmp); } else { - return v.partition().lower_bound(*_snapshot_schema, ck_range_snapshot); + return lower_bound(v.partition(), *_snapshot_schema, ck_range_snapshot); } }(); + auto cr_end = upper_bound(v.partition(), *_snapshot_schema, ck_range_snapshot); if (cr != cr_end) { _clustering_rows.emplace_back(rows_position { cr, cr_end }); @@ -143,11 +180,16 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return e; } - const range_tombstone& pop_range_tombstone() { + range_tombstone pop_range_tombstone() { boost::range::pop_heap(_range_tombstones, _heap_cmp); auto& current = _range_tombstones.back(); - const range_tombstone& rt = current.begin()->tombstone(); - current.advance_begin(1); + range_tombstone rt = (Reversing ? std::prev(current.end()) : current.begin())->tombstone(); + if constexpr (Reversing) { + current.advance_end(-1); + rt.reverse(); + } else { + current.advance_begin(1); + } if (current.begin() == current.end()) { _range_tombstones.pop_back(); } else { @@ -164,8 +206,14 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return !_clustering_rows.empty(); } - const range_tombstone& peek_range_tombstone() const { - return _range_tombstones.front().begin()->tombstone(); + range_tombstone peek_range_tombstone() const { + if constexpr (Reversing) { + range_tombstone rt = std::prev(_range_tombstones.front().end())->tombstone(); + rt.reverse(); + return rt; + } else { + return _range_tombstones.front().begin()->tombstone(); + } } bool has_more_range_tombstones() const { return !_range_tombstones.empty(); @@ -175,7 +223,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public logalloc::region& region, logalloc::allocating_section& read_section, bool digest_requested) : _query_schema(s) - , _snapshot_schema(s.shared_from_this()) + , _snapshot_schema(Reversing ? s.make_reversed() : s.shared_from_this()) , _permit(permit) , _heap_cmp(s) , _snapshot(std::move(snp)) @@ -228,6 +276,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public e.row().cells().prepare_hash(_query_schema, column_kind::regular_column); } auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _query_schema, _permit, _query_schema, e); + // TODO: Ideally this should be position() or position().reversed(), depending on Reversing. while (has_more_rows() && rows_eq(peek_row().position(), result.as_clustering_row().position())) { const rows_entry& e = pop_clustering_row(); if (_digest_requested) { @@ -267,7 +316,6 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public _rt_stream.apply(std::move(rt)); } } - return _rt_stream.get_next(std::move(pos)); }); } @@ -416,7 +464,7 @@ public: } }; -template +template inline flat_mutation_reader make_partition_snapshot_flat_reader(schema_ptr s, reader_permit permit, @@ -430,7 +478,7 @@ make_partition_snapshot_flat_reader(schema_ptr s, streamed_mutation::forwarding fwd, Args&&... args) { - auto res = make_flat_mutation_reader>(std::move(s), std::move(permit), std::move(dk), + auto res = make_flat_mutation_reader>(std::move(s), std::move(permit), std::move(dk), snp, std::move(crr), digest_requested, region, read_section, std::move(pointer_to_container), std::forward(args)...); if (fwd) { return make_forwardable(std::move(res)); // FIXME: optimize From cc5ea669575a92c3757c8af7dcefd5ad5a21e0b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Radwa=C5=84ski?= Date: Tue, 21 Sep 2021 19:51:21 +0200 Subject: [PATCH 7/9] partition_snapshot_reader: reverse ck_range when needed by Reversing Previous commits made it possible to split the responsibility of two kinds of clustering key ranges in read_next and next_range_tombstone. Here, the actual reversal takes place and we start passing the actually reversed ck_range, if Reversing. This reversed ck_range is stored as a class member, so that the reversal happens just once for each range. --- partition_snapshot_reader.hh | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 65d0f36706..6eaab11e17 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -332,6 +332,9 @@ private: query::clustering_row_ranges::const_iterator _current_ck_range; query::clustering_row_ranges::const_iterator _ck_range_end; + // Holds reversed current clustering key range, if Reversing was needed. + std::optional opt_reversed_range; + std::optional _last_entry; std::optional _last_rts; mutation_fragment_opt _next_row; @@ -352,15 +355,15 @@ private: } mutation_fragment_opt read_next() { - if (!_next_row && !_no_more_rows_in_current_range) { - _next_row = _reader.next_row(*_current_ck_range, _last_entry, _last_rts); - } - // We use the names ck_range_snapshot and ck_range_query to denote clustering order. // ck_range_snapshot uses the snapshot order, while ck_range_query uses the // query order. These two differ if the query was reversed (`Reversing==true`). const auto& ck_range_snapshot = *_current_ck_range; - const auto& ck_range_query = *_current_ck_range; + const auto& ck_range_query = opt_reversed_range ? *opt_reversed_range : ck_range_snapshot; + + if (!_next_row && !_no_more_rows_in_current_range) { + _next_row = _reader.next_row(ck_range_snapshot, _last_entry, _last_rts); + } if (_next_row) { auto pos_view = _next_row->as_clustering_row().position(); @@ -397,6 +400,15 @@ private: _no_more_rows_in_current_range = false; } + void fill_opt_reversed_range() { + opt_reversed_range = std::nullopt; + if (_current_ck_range != _ck_range_end) { + if constexpr (Reversing) { + opt_reversed_range = query::reverse(*_current_ck_range); + } + } + } + void do_fill_buffer() { while (!is_end_of_stream() && !is_buffer_full()) { auto mfopt = read_next(); @@ -406,6 +418,7 @@ private: _last_entry = std::nullopt; _last_rts = std::nullopt; _current_ck_range = std::next(_current_ck_range); + fill_opt_reversed_range(); on_new_range(); } if (need_preempt()) { @@ -427,6 +440,7 @@ public: , _ck_range_end(_ck_ranges.end()) , _reader(*_schema, _permit, std::move(snp), region, read_section, digest_requested) { + fill_opt_reversed_range(); _reader.with_reserve([&] { push_mutation_fragment(*_schema, _permit, partition_start(std::move(dk), _reader.partition_tombstone())); }); From 771f3b12bd40f425d8a6c27b7560fca7f4fe0ea9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Radwa=C5=84ski?= Date: Mon, 20 Sep 2021 14:09:03 +0200 Subject: [PATCH 8/9] memtable: enable native reversing This commit consists of changes, which need to reside in a single commit, so that the tests pass on each of the commits. 1. Remove do_make_flat_reader which disabled reverse reads by making the slice a forward one. Remove call to get_ranges which would do superfluous reversal of clustering ranges. 2. test: cql_query_test: remove expectation that the test_query_limit fails for reversed queries, since reversed queries no longer require linear memory wrt. the result size, when paginated. --- memtable.cc | 52 ++++++++++++------------------------ memtable.hh | 2 -- test/boost/cql_query_test.cc | 2 +- 3 files changed, 18 insertions(+), 38 deletions(-) diff --git a/memtable.cc b/memtable.cc index 896b3aa448..e833296b8c 100644 --- a/memtable.cc +++ b/memtable.cc @@ -474,7 +474,11 @@ public: }); if (key_and_snp) { update_last(key_and_snp->first); - auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), _slice, key_and_snp->first.key()); + + const query::clustering_row_ranges& ranges = _slice.row_ranges(*schema(), key_and_snp->first.key()); + // TODO: when the slice passed from query finally changes format from half-reversed into native reversed, this line needs to change. + auto cr = query::clustering_key_filter_ranges(ranges); + auto snp_schema = key_and_snp->second->schema(); bool digest_requested = _slice.options.contains(); bool is_reversed = _slice.options.contains(query::partition_slice::option::reversed); @@ -702,7 +706,7 @@ partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) { } flat_mutation_reader -memtable::do_make_flat_reader(schema_ptr s, +memtable::make_flat_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -726,47 +730,25 @@ memtable::do_make_flat_reader(schema_ptr s, return make_empty_flat_reader(std::move(s), std::move(permit)); } auto dk = pos.as_decorated_key(); - auto cr = query::clustering_key_filter_ranges::get_ranges(*s, slice, dk.key()); + + const query::clustering_row_ranges& ranges = slice.row_ranges(*s, dk.key()); + // TODO: when the slice passed from query finally changes format from half-reversed into native reversed, this line needs to change. + auto cr = query::clustering_key_filter_ranges(ranges); + bool digest_requested = slice.options.contains(); auto rd = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _read_section, shared_from_this(), fwd, *this); rd.upgrade_schema(s); return rd; } else { - return make_flat_mutation_reader(std::move(s), shared_from_this(), std::move(permit), range, slice, pc, fwd_mr); + auto res = make_flat_mutation_reader(std::move(s), shared_from_this(), std::move(permit), range, slice, pc, fwd_mr); + if (fwd == streamed_mutation::forwarding::yes) { + return make_forwardable(std::move(res)); + } else { + return res; + } } } -flat_mutation_reader -memtable::make_flat_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& query_slice, - const io_priority_class& pc, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - // When the memtable is flushed while a scanning read is ongoing, an sstable - // reader is created to replace the memtable reader mid-air. This sstable - // reader will get the memtable's slice. We don't want this sstable reader - // to read in reverse as we are reversing the stream on top of the memtable - // reader. Unreverse the slice here so when it is passed to the sstable - // reader it doesn't try to read in reverse. This is not required technically - // for single partition reads, but we do it anyway to keep things simple. - std::unique_ptr unreversed_slice; - const auto reversed = query_slice.options.contains(query::partition_slice::option::reversed); - auto fwd_sm = fwd; - if (reversed) { - fwd_sm = streamed_mutation::forwarding::no; - s = s->make_reversed(); - unreversed_slice = std::make_unique(query::half_reverse_slice(*s, query_slice)); - } - const auto& slice = reversed ? *unreversed_slice : query_slice; - - auto rd = do_make_flat_reader(std::move(s), permit, range, slice, pc, std::move(trace_state_ptr), fwd_sm, fwd_mr); - - if (reversed) { - rd = make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice)); - } - if (fwd && (reversed || !query::is_single_partition(range) || fwd_mr)) { - rd = make_forwardable(std::move(rd)); - } - return rd; -} - flat_mutation_reader memtable::make_flush_reader(schema_ptr s, reader_permit permit, const io_priority_class& pc) { if (group()) { diff --git a/memtable.hh b/memtable.hh index dcb299049b..8c43675d80 100644 --- a/memtable.hh +++ b/memtable.hh @@ -177,8 +177,6 @@ private: void remove_flushed_memory(uint64_t); void clear() noexcept; uint64_t dirty_size() const; - flat_mutation_reader do_make_flat_reader(schema_ptr, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, - const io_priority_class& pc, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr); public: explicit memtable(schema_ptr schema, dirty_memory_manager&, table_stats& table_stats, memtable_list *memtable_list = nullptr, seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group()); diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index b0ba127ace..2fdfc4dd22 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4647,7 +4647,7 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) { for (auto is_paged : {true, false}) { for (auto is_reversed : {true, false}) { for (auto scheduling_group : {db.get_statement_scheduling_group(), db.get_streaming_scheduling_group(), default_scheduling_group()}) { - const auto should_fail = (!is_paged || is_reversed) && scheduling_group == db.get_statement_scheduling_group(); + const auto should_fail = !is_paged && scheduling_group == db.get_statement_scheduling_group(); testlog.info("checking: is_paged={}, is_reversed={}, scheduling_group={}, should_fail={}", is_paged, is_reversed, scheduling_group.name(), should_fail); const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC"); From c04dffbc01ddc0096862d208f462bfbeef37f47f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Radwa=C5=84ski?= Date: Mon, 20 Sep 2021 14:03:08 +0200 Subject: [PATCH 9/9] partition_snapshot_reader: pop_range_tombstone returns reference (instead of value) when possible. --- partition_snapshot_reader.hh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 6eaab11e17..5763320e26 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -206,7 +206,10 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return !_clustering_rows.empty(); } - range_tombstone peek_range_tombstone() const { + // Let's not lose performance when not Reversing. + using peeked_range_tombstone = std::conditional_t; + + peeked_range_tombstone peek_range_tombstone() const { if constexpr (Reversing) { range_tombstone rt = std::prev(_range_tombstones.front().end())->tombstone(); rt.reverse();