diff --git a/memtable.cc b/memtable.cc index c2088005ba..e833296b8c 100644 --- a/memtable.cc +++ b/memtable.cc @@ -26,6 +26,18 @@ #include "partition_builder.hh" #include "mutation_partition_view.hh" +static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema( + bool is_reversed, + reader_permit permit, + dht::decorated_key dk, + query::clustering_key_filter_ranges crr, + partition_snapshot_ptr snp, + bool digest_requested, + logalloc::region& region, + logalloc::allocating_section& read_section, + boost::any pointer_to_container, + streamed_mutation::forwarding fwd, memtable& memtable); + void memtable::memtable_encoding_stats_collector::update_timestamp(api::timestamp_type ts) { if (ts != api::missing_timestamp) { encoding_stats_collector::update_timestamp(ts); @@ -462,11 +474,15 @@ public: }); if (key_and_snp) { update_last(key_and_snp->first); - auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), _slice, key_and_snp->first.key()); + + const query::clustering_row_ranges& ranges = _slice.row_ranges(*schema(), key_and_snp->first.key()); + // TODO: when the slice passed from query finally changes format from half-reversed into native reversed, this line needs to change. + auto cr = query::clustering_key_filter_ranges(ranges); + auto snp_schema = key_and_snp->second->schema(); bool digest_requested = _slice.options.contains(); - auto mpsr = make_partition_snapshot_flat_reader(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr), - std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl()); + bool is_reversed = _slice.options.contains(query::partition_slice::option::reversed); + auto mpsr = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl()); mpsr.upgrade_schema(schema()); _delegate = std::move(mpsr); } else { @@ -582,6 +598,25 @@ public: } }; +static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema( + bool is_reversed, + reader_permit permit, + dht::decorated_key dk, + query::clustering_key_filter_ranges crr, + partition_snapshot_ptr snp, + bool digest_requested, + logalloc::region& region, + logalloc::allocating_section& read_section, + boost::any pointer_to_container, + streamed_mutation::forwarding fwd, memtable& memtable) { + if (is_reversed) { + schema_ptr rev_snp_schema = snp->schema()->make_reversed(); + return make_partition_snapshot_flat_reader(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable); + } else { + return make_partition_snapshot_flat_reader(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable); + } +} + class flush_reader final : public flat_mutation_reader::impl, private iterator_reader { // FIXME: Similarly to scanning_reader we have an underlying // flat_mutation_reader for each partition. This is suboptimal. @@ -618,7 +653,7 @@ private: update_last(key_and_snp->first); auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), schema()->full_slice(), key_and_snp->first.key()); auto snp_schema = key_and_snp->second->schema(); - auto mpsr = make_partition_snapshot_flat_reader(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr), + auto mpsr = make_partition_snapshot_flat_reader(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory); mpsr.upgrade_schema(schema()); _partition_reader = std::move(mpsr); @@ -671,7 +706,7 @@ partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) { } flat_mutation_reader -memtable::do_make_flat_reader(schema_ptr s, +memtable::make_flat_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -679,6 +714,7 @@ memtable::do_make_flat_reader(schema_ptr s, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { + bool is_reversed = slice.options.contains(query::partition_slice::option::reversed); if (query::is_single_partition(range) && !fwd_mr) { const query::ring_position& pos = range.start()->value(); auto snp = _read_section(*this, [&] () -> partition_snapshot_ptr { @@ -694,49 +730,25 @@ memtable::do_make_flat_reader(schema_ptr s, return make_empty_flat_reader(std::move(s), std::move(permit)); } auto dk = pos.as_decorated_key(); - auto cr = query::clustering_key_filter_ranges::get_ranges(*s, slice, dk.key()); - auto snp_schema = snp->schema(); + + const query::clustering_row_ranges& ranges = slice.row_ranges(*s, dk.key()); + // TODO: when the slice passed from query finally changes format from half-reversed into native reversed, this line needs to change. + auto cr = query::clustering_key_filter_ranges(ranges); + bool digest_requested = slice.options.contains(); - auto rd = make_partition_snapshot_flat_reader(snp_schema, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, - *this, _read_section, shared_from_this(), fwd, *this); + auto rd = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _read_section, shared_from_this(), fwd, *this); rd.upgrade_schema(s); return rd; } else { - return make_flat_mutation_reader(std::move(s), shared_from_this(), std::move(permit), range, slice, pc, fwd_mr); + auto res = make_flat_mutation_reader(std::move(s), shared_from_this(), std::move(permit), range, slice, pc, fwd_mr); + if (fwd == streamed_mutation::forwarding::yes) { + return make_forwardable(std::move(res)); + } else { + return res; + } } } -flat_mutation_reader -memtable::make_flat_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& query_slice, - const io_priority_class& pc, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - // When the memtable is flushed while a scanning read is ongoing, an sstable - // reader is created to replace the memtable reader mid-air. This sstable - // reader will get the memtable's slice. We don't want this sstable reader - // to read in reverse as we are reversing the stream on top of the memtable - // reader. Unreverse the slice here so when it is passed to the sstable - // reader it doesn't try to read in reverse. This is not required technically - // for single partition reads, but we do it anyway to keep things simple. - std::unique_ptr unreversed_slice; - const auto reversed = query_slice.options.contains(query::partition_slice::option::reversed); - auto fwd_sm = fwd; - if (reversed) { - fwd_sm = streamed_mutation::forwarding::no; - s = s->make_reversed(); - unreversed_slice = std::make_unique(query::half_reverse_slice(*s, query_slice)); - } - const auto& slice = reversed ? *unreversed_slice : query_slice; - - auto rd = do_make_flat_reader(std::move(s), permit, range, slice, pc, std::move(trace_state_ptr), fwd_sm, fwd_mr); - - if (reversed) { - rd = make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice)); - } - if (fwd && (reversed || !query::is_single_partition(range) || fwd_mr)) { - rd = make_forwardable(std::move(rd)); - } - return rd; -} - flat_mutation_reader memtable::make_flush_reader(schema_ptr s, reader_permit permit, const io_priority_class& pc) { if (group()) { diff --git a/memtable.hh b/memtable.hh index dcb299049b..8c43675d80 100644 --- a/memtable.hh +++ b/memtable.hh @@ -177,8 +177,6 @@ private: void remove_flushed_memory(uint64_t); void clear() noexcept; uint64_t dirty_size() const; - flat_mutation_reader do_make_flat_reader(schema_ptr, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, - const io_priority_class& pc, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr); public: explicit memtable(schema_ptr schema, dirty_memory_manager&, table_stats& table_stats, memtable_list *memtable_list = nullptr, seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group()); diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 52d1d89614..5763320e26 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -24,24 +24,45 @@ #include "partition_version.hh" #include "flat_mutation_reader.hh" #include "clustering_key_filter.hh" +#include "query-request.hh" #include -template +template class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public Accounter { + using rows_iter_type = std::conditional_t; struct rows_position { - mutation_partition::rows_type::const_iterator _position; - mutation_partition::rows_type::const_iterator _end; + rows_iter_type _position, _end; }; + static rows_iter_type make_iterator(mutation_partition::rows_type::const_iterator it) { + if constexpr (Reversing) { + return std::make_reverse_iterator(it); + } else { + return it; + } + } + class heap_compare { position_in_partition::less_compare _less; public: + // `s` shall be native to the query clustering order. explicit heap_compare(const schema& s) : _less(s) { } bool operator()(const rows_position& a, const rows_position& b) { - return _less(b._position->position(), a._position->position()); + if constexpr (Reversing) { + // Here `reversed()` doesn't change anything, but keep it for consistency. + return _less(b._position->position().reversed(), a._position->position().reversed()); + } else { + return _less(b._position->position(), a._position->position()); + } } bool operator()(const range_tombstone_list::iterator_range& a, const range_tombstone_list::iterator_range& b) { - return _less(b.front().position(), a.front().position()); + if constexpr (Reversing) { + return _less(b.back().end_position().reversed(), a.back().end_position().reversed()); + } else { + return _less(b.front().position(), a.front().position()); + } } }; @@ -50,7 +71,14 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public // snapshot, references to region and alloc section) or dropped on any // allocation section retry (_clustering_rows). class lsa_partition_reader { - const schema& _schema; + // _query_schema can be used to retrieve the clustering key order which is used + // for result ordering. This schema is passed from the query and is reversed iff + // the query was reversed (i.e. `Reversing==true`). + const schema& _query_schema; + // _snapshot_schema is a schema that induces the same clustering key order as the + // schema from the underlying snapshot. The schemas mentioned might differ, for + // instance, if a query used newer version of the schema. + const schema_ptr _snapshot_schema; reader_permit _permit; heap_compare _heap_cmp; @@ -73,32 +101,51 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return fn(); }); } - void maybe_refresh_state(const query::clustering_range& ck_range, + void maybe_refresh_state(const query::clustering_range& ck_range_snapshot, const std::optional& last_row, const std::optional& last_rts) { auto mark = _snapshot->get_change_mark(); if (mark != _change_mark) { - do_refresh_state(ck_range, last_row, last_rts); + do_refresh_state(ck_range_snapshot, last_row, last_rts); _change_mark = mark; } } - void do_refresh_state(const query::clustering_range& ck_range, + // In reversing mode, upper and lower bounds still need to be executed against + // snapshot schema and ck_range, however we need them to search from "opposite" direction. + template + static rows_iter_type lower_bound(const T& t, Args... args) { + if constexpr (Reversing) { + return make_iterator(t.upper_bound(std::forward(args)...)); + } else { + return make_iterator(t.lower_bound(std::forward(args)...)); + } + } + template + static rows_iter_type upper_bound(const T& t, Args... args) { + if constexpr (Reversing) { + return make_iterator(t.lower_bound(std::forward(args)...)); + } else { + return make_iterator(t.upper_bound(std::forward(args)...)); + } + } + + void do_refresh_state(const query::clustering_range& ck_range_snapshot, const std::optional& last_row, const std::optional& last_rts) { _clustering_rows.clear(); _range_tombstones.clear(); - rows_entry::tri_compare rows_cmp(_schema); + rows_entry::tri_compare rows_cmp(*_snapshot_schema); for (auto&& v : _snapshot->versions()) { - mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(_schema, ck_range); - auto cr = [&] () -> mutation_partition::rows_type::const_iterator { + auto cr = [&] () { if (last_row) { - return v.partition().clustered_rows().upper_bound(*last_row, rows_cmp); + return upper_bound(v.partition().clustered_rows(), *last_row, rows_cmp); } else { - return v.partition().lower_bound(_schema, ck_range); + return lower_bound(v.partition(), *_snapshot_schema, ck_range_snapshot); } }(); + auto cr_end = upper_bound(v.partition(), *_snapshot_schema, ck_range_snapshot); if (cr != cr_end) { _clustering_rows.emplace_back(rows_position { cr, cr_end }); @@ -106,9 +153,9 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public range_tombstone_list::iterator_range rt_slice = [&] () { if (last_rts) { - return v.partition().row_tombstones().upper_slice(_schema, *last_rts, bound_view::from_range_end(ck_range)); + return v.partition().row_tombstones().upper_slice(*_snapshot_schema, *last_rts, bound_view::from_range_end(ck_range_snapshot)); } else { - return v.partition().row_tombstones().slice(_schema, ck_range); + return v.partition().row_tombstones().slice(*_snapshot_schema, ck_range_snapshot); } }(); if (rt_slice.begin() != rt_slice.end()) { @@ -133,11 +180,16 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return e; } - const range_tombstone& pop_range_tombstone() { + range_tombstone pop_range_tombstone() { boost::range::pop_heap(_range_tombstones, _heap_cmp); auto& current = _range_tombstones.back(); - const range_tombstone& rt = current.begin()->tombstone(); - current.advance_begin(1); + range_tombstone rt = (Reversing ? std::prev(current.end()) : current.begin())->tombstone(); + if constexpr (Reversing) { + current.advance_end(-1); + rt.reverse(); + } else { + current.advance_begin(1); + } if (current.begin() == current.end()) { _range_tombstones.pop_back(); } else { @@ -154,8 +206,17 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return !_clustering_rows.empty(); } - const range_tombstone& peek_range_tombstone() const { - return _range_tombstones.front().begin()->tombstone(); + // Let's not lose performance when not Reversing. + using peeked_range_tombstone = std::conditional_t; + + peeked_range_tombstone peek_range_tombstone() const { + if constexpr (Reversing) { + range_tombstone rt = std::prev(_range_tombstones.front().end())->tombstone(); + rt.reverse(); + return rt; + } else { + return _range_tombstones.front().begin()->tombstone(); + } } bool has_more_range_tombstones() const { return !_range_tombstones.empty(); @@ -164,7 +225,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public explicit lsa_partition_reader(const schema& s, reader_permit permit, partition_snapshot_ptr snp, logalloc::region& region, logalloc::allocating_section& read_section, bool digest_requested) - : _schema(s) + : _query_schema(s) + , _snapshot_schema(Reversing ? s.make_reversed() : s.shared_from_this()) , _permit(permit) , _heap_cmp(s) , _snapshot(std::move(snp)) @@ -174,8 +236,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public , _digest_requested(digest_requested) { } - void reset_state(const query::clustering_range& ck_range) { - do_refresh_state(ck_range, {}, {}); + void reset_state(const query::clustering_range& ck_range_snapshot) { + do_refresh_state(ck_range_snapshot, {}, {}); } template @@ -195,35 +257,36 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public } // Returns next clustered row in the range. - // If the ck_range is the same as the one used previously last_row needs + // If the ck_range_snapshot is the same as the one used previously last_row needs // to be engaged and equal the position of the row returned last time. - // If the ck_range is different or this is the first call to this + // If the ck_range_snapshot is different or this is the first call to this // function last_row has to be disengaged. Additionally, when entering // new range _rt_stream will be populated with all relevant // tombstones. - mutation_fragment_opt next_row(const query::clustering_range& ck_range, + mutation_fragment_opt next_row(const query::clustering_range& ck_range_snapshot, const std::optional& last_row, const std::optional& last_rts) { return in_alloc_section([&] () -> mutation_fragment_opt { - maybe_refresh_state(ck_range, last_row, last_rts); + maybe_refresh_state(ck_range_snapshot, last_row, last_rts); - position_in_partition::equal_compare rows_eq(_schema); + position_in_partition::equal_compare rows_eq(_query_schema); while (has_more_rows()) { const rows_entry& e = pop_clustering_row(); if (e.dummy()) { continue; } if (_digest_requested) { - e.row().cells().prepare_hash(_schema, column_kind::regular_column); + e.row().cells().prepare_hash(_query_schema, column_kind::regular_column); } - auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _schema, _permit, _schema, e); + auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _query_schema, _permit, _query_schema, e); + // TODO: Ideally this should be position() or position().reversed(), depending on Reversing. while (has_more_rows() && rows_eq(peek_row().position(), result.as_clustering_row().position())) { const rows_entry& e = pop_clustering_row(); if (_digest_requested) { - e.row().cells().prepare_hash(_schema, column_kind::regular_column); + e.row().cells().prepare_hash(_query_schema, column_kind::regular_column); } - result.mutate_as_clustering_row(_schema, [&] (clustering_row& cr) mutable { - cr.apply(_schema, e); + result.mutate_as_clustering_row(_query_schema, [&] (clustering_row& cr) mutable { + cr.apply(_query_schema, e); }); } return result; @@ -232,7 +295,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public }); } - mutation_fragment_opt next_range_tombstone(const query::clustering_range& ck_range, + mutation_fragment_opt next_range_tombstone(const query::clustering_range& ck_range_snapshot, + const query::clustering_range& ck_range_query, const std::optional& last_row, const std::optional& last_rts, position_in_partition_view pos) { @@ -240,20 +304,21 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return _rt_stream.get_next(std::move(pos)); } return in_alloc_section([&] () -> mutation_fragment_opt { - maybe_refresh_state(ck_range, last_row, last_rts); + maybe_refresh_state(ck_range_snapshot, last_row, last_rts); + + position_in_partition::less_compare rt_less(_query_schema); - position_in_partition::less_compare rt_less(_schema); while (has_more_range_tombstones() && !rt_less(pos, peek_range_tombstone().position()) && (_rt_stream.empty() || !rt_less(_rt_stream.peek_next().position(), peek_range_tombstone().position()))) { range_tombstone rt = pop_range_tombstone(); - if (rt.trim(_schema, - position_in_partition_view::for_range_start(ck_range), - position_in_partition_view::for_range_end(ck_range))) { + + if (rt.trim(_query_schema, + position_in_partition_view::for_range_start(ck_range_query), + position_in_partition_view::for_range_end(ck_range_query))) { _rt_stream.apply(std::move(rt)); } } - return _rt_stream.get_next(std::move(pos)); }); } @@ -263,10 +328,16 @@ private: // that its lifetime is appropriately extended. boost::any _container_guard; + // Each range from _ck_ranges are taken to be in snapshot clustering key + // order, i.e. given a comparator derived from snapshot schema, for each ck_range from + // _ck_ranges, begin(ck_range) <= end(ck_range). query::clustering_key_filter_ranges _ck_ranges; query::clustering_row_ranges::const_iterator _current_ck_range; query::clustering_row_ranges::const_iterator _ck_range_end; + // Holds reversed current clustering key range, if Reversing was needed. + std::optional opt_reversed_range; + std::optional _last_entry; std::optional _last_rts; mutation_fragment_opt _next_row; @@ -287,14 +358,21 @@ private: } mutation_fragment_opt read_next() { + // We use the names ck_range_snapshot and ck_range_query to denote clustering order. + // ck_range_snapshot uses the snapshot order, while ck_range_query uses the + // query order. These two differ if the query was reversed (`Reversing==true`). + const auto& ck_range_snapshot = *_current_ck_range; + const auto& ck_range_query = opt_reversed_range ? *opt_reversed_range : ck_range_snapshot; + if (!_next_row && !_no_more_rows_in_current_range) { - _next_row = _reader.next_row(*_current_ck_range, _last_entry, _last_rts); + _next_row = _reader.next_row(ck_range_snapshot, _last_entry, _last_rts); } + if (_next_row) { auto pos_view = _next_row->as_clustering_row().position(); _last_entry = position_in_partition(pos_view); - auto mf = _reader.next_range_tombstone(*_current_ck_range, _last_entry, _last_rts, pos_view); + auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, pos_view); if (mf) { _last_rts = mf->as_range_tombstone().position(); return mf; @@ -302,7 +380,7 @@ private: return std::exchange(_next_row, {}); } else { _no_more_rows_in_current_range = true; - auto mf = _reader.next_range_tombstone(*_current_ck_range, _last_entry, _last_rts, position_in_partition_view::for_range_end(*_current_ck_range)); + auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, position_in_partition_view::for_range_end(ck_range_query)); if (mf) { _last_rts = mf->as_range_tombstone().position(); } @@ -325,6 +403,15 @@ private: _no_more_rows_in_current_range = false; } + void fill_opt_reversed_range() { + opt_reversed_range = std::nullopt; + if (_current_ck_range != _ck_range_end) { + if constexpr (Reversing) { + opt_reversed_range = query::reverse(*_current_ck_range); + } + } + } + void do_fill_buffer() { while (!is_end_of_stream() && !is_buffer_full()) { auto mfopt = read_next(); @@ -334,6 +421,7 @@ private: _last_entry = std::nullopt; _last_rts = std::nullopt; _current_ck_range = std::next(_current_ck_range); + fill_opt_reversed_range(); on_new_range(); } if (need_preempt()) { @@ -355,6 +443,7 @@ public: , _ck_range_end(_ck_ranges.end()) , _reader(*_schema, _permit, std::move(snp), region, read_section, digest_requested) { + fill_opt_reversed_range(); _reader.with_reserve([&] { push_mutation_fragment(*_schema, _permit, partition_start(std::move(dk), _reader.partition_tombstone())); }); @@ -392,7 +481,7 @@ public: } }; -template +template inline flat_mutation_reader make_partition_snapshot_flat_reader(schema_ptr s, reader_permit permit, @@ -406,7 +495,7 @@ make_partition_snapshot_flat_reader(schema_ptr s, streamed_mutation::forwarding fwd, Args&&... args) { - auto res = make_flat_mutation_reader>(std::move(s), std::move(permit), std::move(dk), + auto res = make_flat_mutation_reader>(std::move(s), std::move(permit), std::move(dk), snp, std::move(crr), digest_requested, region, read_section, std::move(pointer_to_container), std::forward(args)...); if (fwd) { return make_forwardable(std::move(res)); // FIXME: optimize diff --git a/query-request.hh b/query-request.hh index 555d249197..995f15ddfc 100644 --- a/query-request.hh +++ b/query-request.hh @@ -45,6 +45,16 @@ using range = wrapping_range; using ring_position = dht::ring_position; using clustering_range = nonwrapping_range; +// If `range` was supposed to be used with a comparator `cmp`, then +// `reverse(range)` is supposed to be used with a reversed comparator `c`. +// For instance, if it does make sense to do +// range.contains(point, cmp); +// then it also makes sense to do +// reversed(range).contains(point, [](auto x, auto y) { return cmp(y, x); }); +// but it doesn't make sense to do +// reversed(range).contains(point, cmp); +clustering_range reverse(const clustering_range& range); + extern const dht::partition_range full_partition_range; extern const clustering_range full_clustering_range; diff --git a/query.cc b/query.cc index ec12a6239a..f8cdb23cad 100644 --- a/query.cc +++ b/query.cc @@ -115,11 +115,18 @@ void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& range reversed ? position_in_partition_view::after_key(full_key) : position_in_partition_view::before_key(full_key), reversed); } + +clustering_range reverse(const clustering_range& range) { + if (range.is_singular()) { + return range; + } + return clustering_range(range.end(), range.start()); +} + + static void reverse_clustering_ranges_bounds(clustering_row_ranges& ranges) { for (auto& range : ranges) { - if (!range.is_singular()) { - range = query::clustering_range(range.end(), range.start()); - } + range = reverse(range); } } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 38e4a27b9d..f58cf99d9f 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1328,6 +1328,11 @@ endpoints_to_replica_ids(const locator::token_metadata& tm, const inet_address_v query::max_result_size storage_proxy::get_max_result_size(const query::partition_slice& slice) const { // Unpaged and reverse queries. + // FIXME: some reversed queries are no longer unlimited. + // To filter these out we need to know if it's a single partition query. + // Take the partition key range as a parameter? This would require changing many call sites + // where the partition key range is not immediately available. + // For now we ignore the issue, return the 'wrong' limit for some reversed queries, and wait until all reversed queries are fixed. if (!slice.options.contains() || slice.options.contains()) { return _db.local().get_unlimited_query_max_result_size(); } else { diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 1701698799..bcf502033d 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4578,6 +4578,29 @@ SEASTAR_TEST_CASE(test_impossible_where) { }); } +// FIXME: copy-pasta +static bool has_more_pages(::shared_ptr res) { + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE(rows); + return rows->rs().get_metadata().flags().contains(cql3::metadata::flag::HAS_MORE_PAGES); +}; + +static size_t count_rows_fetched(::shared_ptr res) { + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE(rows); + return rows->rs().result_set().size(); +}; + +static lw_shared_ptr extract_paging_state(::shared_ptr res) { + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE(rows); + auto paging_state = rows->rs().get_metadata().paging_state(); + if (!paging_state) { + return nullptr; + } + return make_lw_shared(*paging_state); +}; + SEASTAR_THREAD_TEST_CASE(test_query_limit) { cql_test_config cfg; @@ -4617,23 +4640,43 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) { for (auto is_paged : {true, false}) { for (auto is_reversed : {true, false}) { for (auto scheduling_group : {db.get_statement_scheduling_group(), db.get_streaming_scheduling_group(), default_scheduling_group()}) { - const auto should_fail = (!is_paged || is_reversed) && scheduling_group == db.get_statement_scheduling_group(); + const auto should_fail = !is_paged && scheduling_group == db.get_statement_scheduling_group(); testlog.info("checking: is_paged={}, is_reversed={}, scheduling_group={}, should_fail={}", is_paged, is_reversed, scheduling_group.name(), should_fail); const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC"); int32_t page_size = is_paged ? 10000 : -1; - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, - cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()}); - const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows; + const auto& expected_rows = is_reversed ? reversed_rows : normal_rows; try { - auto result = with_scheduling_group(scheduling_group, [&e] (const sstring& q, std::unique_ptr qo) { - return e.execute_cql(q, std::move(qo)); - }, select_query, std::move(qo)).get0(); - assert_that(std::move(result)) + bool has_more_pages = true; + lw_shared_ptr paging_state = nullptr; + size_t next_expected_row_idx = 0; + while (has_more_pages) { + // FIXME: even though we chose a large page size, for reversed queries we may still obtain multiple pages. + // This happens because the query result size limit for reversed queries is set to the 'unlimited query hard limit' + // (even though single-partition reversed queries are no longer 'unlimited') which we set to a low value, + // causing reversed queries to finish early. + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, + cql3::query_options::specific_options{page_size, paging_state, {}, api::new_timestamp()}); + auto result = with_scheduling_group(scheduling_group, [&e] (const sstring& q, std::unique_ptr qo) { + return e.execute_cql(q, std::move(qo)); + }, select_query, std::move(qo)).get0(); + + auto rows_fetched = count_rows_fetched(result); + BOOST_REQUIRE(next_expected_row_idx + rows_fetched <= expected_rows.size()); + assert_that(result) .is_rows() - .with_rows(*expected_rows); + .with_rows(std::vector>{ + expected_rows.begin() + next_expected_row_idx, + expected_rows.begin() + next_expected_row_idx + rows_fetched}); + + has_more_pages = ::has_more_pages(result); + paging_state = extract_paging_state(result); + BOOST_REQUIRE(!has_more_pages || paging_state); + next_expected_row_idx += rows_fetched; + } + BOOST_REQUIRE(next_expected_row_idx == expected_rows.size()); if (should_fail) { BOOST_FAIL("Expected exception, but none was thrown.");