Merge 'Memtable make reversing reader' from Michał Radwański

Make a reader that reads from memtable in reverse order.

This draft PR includes two commits, out of which only the second is
relevant for review.

Described in #9133.
Refs #1413.

Closes #9174

* github.com:scylladb/scylla:
  partition_snapshot_reader: pop_range_tombstone returns reference (instead of value) when possible.
  memtable: enable native reversing
  partition_snapshot_reader: reverse ck_range when needed by Reversing
  memtable, partition_snapshot_reader: read from partition in reverse
  partition_snapshot_reader: rows_position and rows_iter_type supporting reverse iteration
  partition_snapshot_reader: split responsibility of ck_range
  partition_snapshot_reader: separate _schema into _query_schema and _partition_schema
  query: reverse clustering_range
  test: cql_query_test: fix test_query_limit for reversed queries
This commit is contained in:
Tomasz Grabiec
2021-10-13 15:01:26 +02:00
committed by Avi Kivity
7 changed files with 265 additions and 101 deletions

View File

@@ -26,6 +26,18 @@
#include "partition_builder.hh"
#include "mutation_partition_view.hh"
static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
bool is_reversed,
reader_permit permit,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
partition_snapshot_ptr snp,
bool digest_requested,
logalloc::region& region,
logalloc::allocating_section& read_section,
boost::any pointer_to_container,
streamed_mutation::forwarding fwd, memtable& memtable);
void memtable::memtable_encoding_stats_collector::update_timestamp(api::timestamp_type ts) {
if (ts != api::missing_timestamp) {
encoding_stats_collector::update_timestamp(ts);
@@ -462,11 +474,15 @@ public:
});
if (key_and_snp) {
update_last(key_and_snp->first);
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), _slice, key_and_snp->first.key());
const query::clustering_row_ranges& ranges = _slice.row_ranges(*schema(), key_and_snp->first.key());
// TODO: when the slice passed from query finally changes format from half-reversed into native reversed, this line needs to change.
auto cr = query::clustering_key_filter_ranges(ranges);
auto snp_schema = key_and_snp->second->schema();
bool digest_requested = _slice.options.contains<query::partition_slice::option::with_digest>();
auto mpsr = make_partition_snapshot_flat_reader<partition_snapshot_read_accounter>(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr),
std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl());
bool is_reversed = _slice.options.contains(query::partition_slice::option::reversed);
auto mpsr = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl());
mpsr.upgrade_schema(schema());
_delegate = std::move(mpsr);
} else {
@@ -582,6 +598,25 @@ public:
}
};
static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
bool is_reversed,
reader_permit permit,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
partition_snapshot_ptr snp,
bool digest_requested,
logalloc::region& region,
logalloc::allocating_section& read_section,
boost::any pointer_to_container,
streamed_mutation::forwarding fwd, memtable& memtable) {
if (is_reversed) {
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
} else {
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
}
}
class flush_reader final : public flat_mutation_reader::impl, private iterator_reader {
// FIXME: Similarly to scanning_reader we have an underlying
// flat_mutation_reader for each partition. This is suboptimal.
@@ -618,7 +653,7 @@ private:
update_last(key_and_snp->first);
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), schema()->full_slice(), key_and_snp->first.key());
auto snp_schema = key_and_snp->second->schema();
auto mpsr = make_partition_snapshot_flat_reader<partition_snapshot_flush_accounter>(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr),
auto mpsr = make_partition_snapshot_flat_reader<false, partition_snapshot_flush_accounter>(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr),
std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory);
mpsr.upgrade_schema(schema());
_partition_reader = std::move(mpsr);
@@ -671,7 +706,7 @@ partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) {
}
flat_mutation_reader
memtable::do_make_flat_reader(schema_ptr s,
memtable::make_flat_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
@@ -679,6 +714,7 @@ memtable::do_make_flat_reader(schema_ptr s,
tracing::trace_state_ptr trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
bool is_reversed = slice.options.contains(query::partition_slice::option::reversed);
if (query::is_single_partition(range) && !fwd_mr) {
const query::ring_position& pos = range.start()->value();
auto snp = _read_section(*this, [&] () -> partition_snapshot_ptr {
@@ -694,49 +730,25 @@ memtable::do_make_flat_reader(schema_ptr s,
return make_empty_flat_reader(std::move(s), std::move(permit));
}
auto dk = pos.as_decorated_key();
auto cr = query::clustering_key_filter_ranges::get_ranges(*s, slice, dk.key());
auto snp_schema = snp->schema();
const query::clustering_row_ranges& ranges = slice.row_ranges(*s, dk.key());
// TODO: when the slice passed from query finally changes format from half-reversed into native reversed, this line needs to change.
auto cr = query::clustering_key_filter_ranges(ranges);
bool digest_requested = slice.options.contains<query::partition_slice::option::with_digest>();
auto rd = make_partition_snapshot_flat_reader<partition_snapshot_read_accounter>(snp_schema, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested,
*this, _read_section, shared_from_this(), fwd, *this);
auto rd = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _read_section, shared_from_this(), fwd, *this);
rd.upgrade_schema(s);
return rd;
} else {
return make_flat_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), std::move(permit), range, slice, pc, fwd_mr);
auto res = make_flat_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), std::move(permit), range, slice, pc, fwd_mr);
if (fwd == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(res));
} else {
return res;
}
}
}
flat_mutation_reader
memtable::make_flat_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& query_slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
// When the memtable is flushed while a scanning read is ongoing, an sstable
// reader is created to replace the memtable reader mid-air. This sstable
// reader will get the memtable's slice. We don't want this sstable reader
// to read in reverse as we are reversing the stream on top of the memtable
// reader. Unreverse the slice here so when it is passed to the sstable
// reader it doesn't try to read in reverse. This is not required technically
// for single partition reads, but we do it anyway to keep things simple.
std::unique_ptr<query::partition_slice> unreversed_slice;
const auto reversed = query_slice.options.contains(query::partition_slice::option::reversed);
auto fwd_sm = fwd;
if (reversed) {
fwd_sm = streamed_mutation::forwarding::no;
s = s->make_reversed();
unreversed_slice = std::make_unique<query::partition_slice>(query::half_reverse_slice(*s, query_slice));
}
const auto& slice = reversed ? *unreversed_slice : query_slice;
auto rd = do_make_flat_reader(std::move(s), permit, range, slice, pc, std::move(trace_state_ptr), fwd_sm, fwd_mr);
if (reversed) {
rd = make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice));
}
if (fwd && (reversed || !query::is_single_partition(range) || fwd_mr)) {
rd = make_forwardable(std::move(rd));
}
return rd;
}
flat_mutation_reader
memtable::make_flush_reader(schema_ptr s, reader_permit permit, const io_priority_class& pc) {
if (group()) {

View File

@@ -177,8 +177,6 @@ private:
void remove_flushed_memory(uint64_t);
void clear() noexcept;
uint64_t dirty_size() const;
flat_mutation_reader do_make_flat_reader(schema_ptr, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr);
public:
explicit memtable(schema_ptr schema, dirty_memory_manager&, table_stats& table_stats, memtable_list *memtable_list = nullptr,
seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group());

View File

@@ -24,24 +24,45 @@
#include "partition_version.hh"
#include "flat_mutation_reader.hh"
#include "clustering_key_filter.hh"
#include "query-request.hh"
#include <boost/range/algorithm/heap_algorithm.hpp>
template <typename Accounter>
template <bool Reversing, typename Accounter>
class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public Accounter {
using rows_iter_type = std::conditional_t<Reversing,
mutation_partition::rows_type::const_reverse_iterator,
mutation_partition::rows_type::const_iterator>;
struct rows_position {
mutation_partition::rows_type::const_iterator _position;
mutation_partition::rows_type::const_iterator _end;
rows_iter_type _position, _end;
};
static rows_iter_type make_iterator(mutation_partition::rows_type::const_iterator it) {
if constexpr (Reversing) {
return std::make_reverse_iterator(it);
} else {
return it;
}
}
class heap_compare {
position_in_partition::less_compare _less;
public:
// `s` shall be native to the query clustering order.
explicit heap_compare(const schema& s) : _less(s) { }
bool operator()(const rows_position& a, const rows_position& b) {
return _less(b._position->position(), a._position->position());
if constexpr (Reversing) {
// Here `reversed()` doesn't change anything, but keep it for consistency.
return _less(b._position->position().reversed(), a._position->position().reversed());
} else {
return _less(b._position->position(), a._position->position());
}
}
bool operator()(const range_tombstone_list::iterator_range& a, const range_tombstone_list::iterator_range& b) {
return _less(b.front().position(), a.front().position());
if constexpr (Reversing) {
return _less(b.back().end_position().reversed(), a.back().end_position().reversed());
} else {
return _less(b.front().position(), a.front().position());
}
}
};
@@ -50,7 +71,14 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
// snapshot, references to region and alloc section) or dropped on any
// allocation section retry (_clustering_rows).
class lsa_partition_reader {
const schema& _schema;
// _query_schema can be used to retrieve the clustering key order which is used
// for result ordering. This schema is passed from the query and is reversed iff
// the query was reversed (i.e. `Reversing==true`).
const schema& _query_schema;
// _snapshot_schema is a schema that induces the same clustering key order as the
// schema from the underlying snapshot. The schemas mentioned might differ, for
// instance, if a query used newer version of the schema.
const schema_ptr _snapshot_schema;
reader_permit _permit;
heap_compare _heap_cmp;
@@ -73,32 +101,51 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
return fn();
});
}
void maybe_refresh_state(const query::clustering_range& ck_range,
void maybe_refresh_state(const query::clustering_range& ck_range_snapshot,
const std::optional<position_in_partition>& last_row,
const std::optional<position_in_partition>& last_rts) {
auto mark = _snapshot->get_change_mark();
if (mark != _change_mark) {
do_refresh_state(ck_range, last_row, last_rts);
do_refresh_state(ck_range_snapshot, last_row, last_rts);
_change_mark = mark;
}
}
void do_refresh_state(const query::clustering_range& ck_range,
// In reversing mode, upper and lower bounds still need to be executed against
// snapshot schema and ck_range, however we need them to search from "opposite" direction.
template<typename T, typename... Args>
static rows_iter_type lower_bound(const T& t, Args... args) {
if constexpr (Reversing) {
return make_iterator(t.upper_bound(std::forward<Args>(args)...));
} else {
return make_iterator(t.lower_bound(std::forward<Args>(args)...));
}
}
template<typename T, typename... Args>
static rows_iter_type upper_bound(const T& t, Args... args) {
if constexpr (Reversing) {
return make_iterator(t.lower_bound(std::forward<Args>(args)...));
} else {
return make_iterator(t.upper_bound(std::forward<Args>(args)...));
}
}
void do_refresh_state(const query::clustering_range& ck_range_snapshot,
const std::optional<position_in_partition>& last_row,
const std::optional<position_in_partition>& last_rts) {
_clustering_rows.clear();
_range_tombstones.clear();
rows_entry::tri_compare rows_cmp(_schema);
rows_entry::tri_compare rows_cmp(*_snapshot_schema);
for (auto&& v : _snapshot->versions()) {
mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(_schema, ck_range);
auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
auto cr = [&] () {
if (last_row) {
return v.partition().clustered_rows().upper_bound(*last_row, rows_cmp);
return upper_bound(v.partition().clustered_rows(), *last_row, rows_cmp);
} else {
return v.partition().lower_bound(_schema, ck_range);
return lower_bound(v.partition(), *_snapshot_schema, ck_range_snapshot);
}
}();
auto cr_end = upper_bound(v.partition(), *_snapshot_schema, ck_range_snapshot);
if (cr != cr_end) {
_clustering_rows.emplace_back(rows_position { cr, cr_end });
@@ -106,9 +153,9 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
range_tombstone_list::iterator_range rt_slice = [&] () {
if (last_rts) {
return v.partition().row_tombstones().upper_slice(_schema, *last_rts, bound_view::from_range_end(ck_range));
return v.partition().row_tombstones().upper_slice(*_snapshot_schema, *last_rts, bound_view::from_range_end(ck_range_snapshot));
} else {
return v.partition().row_tombstones().slice(_schema, ck_range);
return v.partition().row_tombstones().slice(*_snapshot_schema, ck_range_snapshot);
}
}();
if (rt_slice.begin() != rt_slice.end()) {
@@ -133,11 +180,16 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
return e;
}
const range_tombstone& pop_range_tombstone() {
range_tombstone pop_range_tombstone() {
boost::range::pop_heap(_range_tombstones, _heap_cmp);
auto& current = _range_tombstones.back();
const range_tombstone& rt = current.begin()->tombstone();
current.advance_begin(1);
range_tombstone rt = (Reversing ? std::prev(current.end()) : current.begin())->tombstone();
if constexpr (Reversing) {
current.advance_end(-1);
rt.reverse();
} else {
current.advance_begin(1);
}
if (current.begin() == current.end()) {
_range_tombstones.pop_back();
} else {
@@ -154,8 +206,17 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
return !_clustering_rows.empty();
}
const range_tombstone& peek_range_tombstone() const {
return _range_tombstones.front().begin()->tombstone();
// Let's not lose performance when not Reversing.
using peeked_range_tombstone = std::conditional_t<Reversing, range_tombstone, const range_tombstone&>;
peeked_range_tombstone peek_range_tombstone() const {
if constexpr (Reversing) {
range_tombstone rt = std::prev(_range_tombstones.front().end())->tombstone();
rt.reverse();
return rt;
} else {
return _range_tombstones.front().begin()->tombstone();
}
}
bool has_more_range_tombstones() const {
return !_range_tombstones.empty();
@@ -164,7 +225,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
explicit lsa_partition_reader(const schema& s, reader_permit permit, partition_snapshot_ptr snp,
logalloc::region& region, logalloc::allocating_section& read_section,
bool digest_requested)
: _schema(s)
: _query_schema(s)
, _snapshot_schema(Reversing ? s.make_reversed() : s.shared_from_this())
, _permit(permit)
, _heap_cmp(s)
, _snapshot(std::move(snp))
@@ -174,8 +236,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
, _digest_requested(digest_requested)
{ }
void reset_state(const query::clustering_range& ck_range) {
do_refresh_state(ck_range, {}, {});
void reset_state(const query::clustering_range& ck_range_snapshot) {
do_refresh_state(ck_range_snapshot, {}, {});
}
template<typename Function>
@@ -195,35 +257,36 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
}
// Returns next clustered row in the range.
// If the ck_range is the same as the one used previously last_row needs
// If the ck_range_snapshot is the same as the one used previously last_row needs
// to be engaged and equal the position of the row returned last time.
// If the ck_range is different or this is the first call to this
// If the ck_range_snapshot is different or this is the first call to this
// function last_row has to be disengaged. Additionally, when entering
// new range _rt_stream will be populated with all relevant
// tombstones.
mutation_fragment_opt next_row(const query::clustering_range& ck_range,
mutation_fragment_opt next_row(const query::clustering_range& ck_range_snapshot,
const std::optional<position_in_partition>& last_row,
const std::optional<position_in_partition>& last_rts) {
return in_alloc_section([&] () -> mutation_fragment_opt {
maybe_refresh_state(ck_range, last_row, last_rts);
maybe_refresh_state(ck_range_snapshot, last_row, last_rts);
position_in_partition::equal_compare rows_eq(_schema);
position_in_partition::equal_compare rows_eq(_query_schema);
while (has_more_rows()) {
const rows_entry& e = pop_clustering_row();
if (e.dummy()) {
continue;
}
if (_digest_requested) {
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
e.row().cells().prepare_hash(_query_schema, column_kind::regular_column);
}
auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _schema, _permit, _schema, e);
auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _query_schema, _permit, _query_schema, e);
// TODO: Ideally this should be position() or position().reversed(), depending on Reversing.
while (has_more_rows() && rows_eq(peek_row().position(), result.as_clustering_row().position())) {
const rows_entry& e = pop_clustering_row();
if (_digest_requested) {
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
e.row().cells().prepare_hash(_query_schema, column_kind::regular_column);
}
result.mutate_as_clustering_row(_schema, [&] (clustering_row& cr) mutable {
cr.apply(_schema, e);
result.mutate_as_clustering_row(_query_schema, [&] (clustering_row& cr) mutable {
cr.apply(_query_schema, e);
});
}
return result;
@@ -232,7 +295,8 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
});
}
mutation_fragment_opt next_range_tombstone(const query::clustering_range& ck_range,
mutation_fragment_opt next_range_tombstone(const query::clustering_range& ck_range_snapshot,
const query::clustering_range& ck_range_query,
const std::optional<position_in_partition>& last_row,
const std::optional<position_in_partition>& last_rts,
position_in_partition_view pos) {
@@ -240,20 +304,21 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
return _rt_stream.get_next(std::move(pos));
}
return in_alloc_section([&] () -> mutation_fragment_opt {
maybe_refresh_state(ck_range, last_row, last_rts);
maybe_refresh_state(ck_range_snapshot, last_row, last_rts);
position_in_partition::less_compare rt_less(_query_schema);
position_in_partition::less_compare rt_less(_schema);
while (has_more_range_tombstones()
&& !rt_less(pos, peek_range_tombstone().position())
&& (_rt_stream.empty() || !rt_less(_rt_stream.peek_next().position(), peek_range_tombstone().position()))) {
range_tombstone rt = pop_range_tombstone();
if (rt.trim(_schema,
position_in_partition_view::for_range_start(ck_range),
position_in_partition_view::for_range_end(ck_range))) {
if (rt.trim(_query_schema,
position_in_partition_view::for_range_start(ck_range_query),
position_in_partition_view::for_range_end(ck_range_query))) {
_rt_stream.apply(std::move(rt));
}
}
return _rt_stream.get_next(std::move(pos));
});
}
@@ -263,10 +328,16 @@ private:
// that its lifetime is appropriately extended.
boost::any _container_guard;
// Each range from _ck_ranges are taken to be in snapshot clustering key
// order, i.e. given a comparator derived from snapshot schema, for each ck_range from
// _ck_ranges, begin(ck_range) <= end(ck_range).
query::clustering_key_filter_ranges _ck_ranges;
query::clustering_row_ranges::const_iterator _current_ck_range;
query::clustering_row_ranges::const_iterator _ck_range_end;
// Holds reversed current clustering key range, if Reversing was needed.
std::optional<query::clustering_range> opt_reversed_range;
std::optional<position_in_partition> _last_entry;
std::optional<position_in_partition> _last_rts;
mutation_fragment_opt _next_row;
@@ -287,14 +358,21 @@ private:
}
mutation_fragment_opt read_next() {
// We use the names ck_range_snapshot and ck_range_query to denote clustering order.
// ck_range_snapshot uses the snapshot order, while ck_range_query uses the
// query order. These two differ if the query was reversed (`Reversing==true`).
const auto& ck_range_snapshot = *_current_ck_range;
const auto& ck_range_query = opt_reversed_range ? *opt_reversed_range : ck_range_snapshot;
if (!_next_row && !_no_more_rows_in_current_range) {
_next_row = _reader.next_row(*_current_ck_range, _last_entry, _last_rts);
_next_row = _reader.next_row(ck_range_snapshot, _last_entry, _last_rts);
}
if (_next_row) {
auto pos_view = _next_row->as_clustering_row().position();
_last_entry = position_in_partition(pos_view);
auto mf = _reader.next_range_tombstone(*_current_ck_range, _last_entry, _last_rts, pos_view);
auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, pos_view);
if (mf) {
_last_rts = mf->as_range_tombstone().position();
return mf;
@@ -302,7 +380,7 @@ private:
return std::exchange(_next_row, {});
} else {
_no_more_rows_in_current_range = true;
auto mf = _reader.next_range_tombstone(*_current_ck_range, _last_entry, _last_rts, position_in_partition_view::for_range_end(*_current_ck_range));
auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, position_in_partition_view::for_range_end(ck_range_query));
if (mf) {
_last_rts = mf->as_range_tombstone().position();
}
@@ -325,6 +403,15 @@ private:
_no_more_rows_in_current_range = false;
}
void fill_opt_reversed_range() {
opt_reversed_range = std::nullopt;
if (_current_ck_range != _ck_range_end) {
if constexpr (Reversing) {
opt_reversed_range = query::reverse(*_current_ck_range);
}
}
}
void do_fill_buffer() {
while (!is_end_of_stream() && !is_buffer_full()) {
auto mfopt = read_next();
@@ -334,6 +421,7 @@ private:
_last_entry = std::nullopt;
_last_rts = std::nullopt;
_current_ck_range = std::next(_current_ck_range);
fill_opt_reversed_range();
on_new_range();
}
if (need_preempt()) {
@@ -355,6 +443,7 @@ public:
, _ck_range_end(_ck_ranges.end())
, _reader(*_schema, _permit, std::move(snp), region, read_section, digest_requested)
{
fill_opt_reversed_range();
_reader.with_reserve([&] {
push_mutation_fragment(*_schema, _permit, partition_start(std::move(dk), _reader.partition_tombstone()));
});
@@ -392,7 +481,7 @@ public:
}
};
template <typename Accounter, typename... Args>
template <bool Reversing, typename Accounter, typename... Args>
inline flat_mutation_reader
make_partition_snapshot_flat_reader(schema_ptr s,
reader_permit permit,
@@ -406,7 +495,7 @@ make_partition_snapshot_flat_reader(schema_ptr s,
streamed_mutation::forwarding fwd,
Args&&... args)
{
auto res = make_flat_mutation_reader<partition_snapshot_flat_reader<Accounter>>(std::move(s), std::move(permit), std::move(dk),
auto res = make_flat_mutation_reader<partition_snapshot_flat_reader<Reversing, Accounter>>(std::move(s), std::move(permit), std::move(dk),
snp, std::move(crr), digest_requested, region, read_section, std::move(pointer_to_container), std::forward<Args>(args)...);
if (fwd) {
return make_forwardable(std::move(res)); // FIXME: optimize

View File

@@ -45,6 +45,16 @@ using range = wrapping_range<T>;
using ring_position = dht::ring_position;
using clustering_range = nonwrapping_range<clustering_key_prefix>;
// If `range` was supposed to be used with a comparator `cmp`, then
// `reverse(range)` is supposed to be used with a reversed comparator `c`.
// For instance, if it does make sense to do
// range.contains(point, cmp);
// then it also makes sense to do
// reversed(range).contains(point, [](auto x, auto y) { return cmp(y, x); });
// but it doesn't make sense to do
// reversed(range).contains(point, cmp);
clustering_range reverse(const clustering_range& range);
extern const dht::partition_range full_partition_range;
extern const clustering_range full_clustering_range;

View File

@@ -115,11 +115,18 @@ void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& range
reversed ? position_in_partition_view::after_key(full_key) : position_in_partition_view::before_key(full_key), reversed);
}
clustering_range reverse(const clustering_range& range) {
if (range.is_singular()) {
return range;
}
return clustering_range(range.end(), range.start());
}
static void reverse_clustering_ranges_bounds(clustering_row_ranges& ranges) {
for (auto& range : ranges) {
if (!range.is_singular()) {
range = query::clustering_range(range.end(), range.start());
}
range = reverse(range);
}
}

View File

@@ -1328,6 +1328,11 @@ endpoints_to_replica_ids(const locator::token_metadata& tm, const inet_address_v
query::max_result_size storage_proxy::get_max_result_size(const query::partition_slice& slice) const {
// Unpaged and reverse queries.
// FIXME: some reversed queries are no longer unlimited.
// To filter these out we need to know if it's a single partition query.
// Take the partition key range as a parameter? This would require changing many call sites
// where the partition key range is not immediately available.
// For now we ignore the issue, return the 'wrong' limit for some reversed queries, and wait until all reversed queries are fixed.
if (!slice.options.contains<query::partition_slice::option::allow_short_read>() || slice.options.contains<query::partition_slice::option::reversed>()) {
return _db.local().get_unlimited_query_max_result_size();
} else {

View File

@@ -4578,6 +4578,29 @@ SEASTAR_TEST_CASE(test_impossible_where) {
});
}
// FIXME: copy-pasta
static bool has_more_pages(::shared_ptr<cql_transport::messages::result_message> res) {
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
BOOST_REQUIRE(rows);
return rows->rs().get_metadata().flags().contains(cql3::metadata::flag::HAS_MORE_PAGES);
};
static size_t count_rows_fetched(::shared_ptr<cql_transport::messages::result_message> res) {
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
BOOST_REQUIRE(rows);
return rows->rs().result_set().size();
};
static lw_shared_ptr<service::pager::paging_state> extract_paging_state(::shared_ptr<cql_transport::messages::result_message> res) {
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
BOOST_REQUIRE(rows);
auto paging_state = rows->rs().get_metadata().paging_state();
if (!paging_state) {
return nullptr;
}
return make_lw_shared<service::pager::paging_state>(*paging_state);
};
SEASTAR_THREAD_TEST_CASE(test_query_limit) {
cql_test_config cfg;
@@ -4617,23 +4640,43 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) {
for (auto is_paged : {true, false}) {
for (auto is_reversed : {true, false}) {
for (auto scheduling_group : {db.get_statement_scheduling_group(), db.get_streaming_scheduling_group(), default_scheduling_group()}) {
const auto should_fail = (!is_paged || is_reversed) && scheduling_group == db.get_statement_scheduling_group();
const auto should_fail = !is_paged && scheduling_group == db.get_statement_scheduling_group();
testlog.info("checking: is_paged={}, is_reversed={}, scheduling_group={}, should_fail={}", is_paged, is_reversed, scheduling_group.name(), should_fail);
const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC");
int32_t page_size = is_paged ? 10000 : -1;
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()});
const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows;
const auto& expected_rows = is_reversed ? reversed_rows : normal_rows;
try {
auto result = with_scheduling_group(scheduling_group, [&e] (const sstring& q, std::unique_ptr<cql3::query_options> qo) {
return e.execute_cql(q, std::move(qo));
}, select_query, std::move(qo)).get0();
assert_that(std::move(result))
bool has_more_pages = true;
lw_shared_ptr<service::pager::paging_state> paging_state = nullptr;
size_t next_expected_row_idx = 0;
while (has_more_pages) {
// FIXME: even though we chose a large page size, for reversed queries we may still obtain multiple pages.
// This happens because the query result size limit for reversed queries is set to the 'unlimited query hard limit'
// (even though single-partition reversed queries are no longer 'unlimited') which we set to a low value,
// causing reversed queries to finish early.
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{page_size, paging_state, {}, api::new_timestamp()});
auto result = with_scheduling_group(scheduling_group, [&e] (const sstring& q, std::unique_ptr<cql3::query_options> qo) {
return e.execute_cql(q, std::move(qo));
}, select_query, std::move(qo)).get0();
auto rows_fetched = count_rows_fetched(result);
BOOST_REQUIRE(next_expected_row_idx + rows_fetched <= expected_rows.size());
assert_that(result)
.is_rows()
.with_rows(*expected_rows);
.with_rows(std::vector<std::vector<bytes_opt>>{
expected_rows.begin() + next_expected_row_idx,
expected_rows.begin() + next_expected_row_idx + rows_fetched});
has_more_pages = ::has_more_pages(result);
paging_state = extract_paging_state(result);
BOOST_REQUIRE(!has_more_pages || paging_state);
next_expected_row_idx += rows_fetched;
}
BOOST_REQUIRE(next_expected_row_idx == expected_rows.size());
if (should_fail) {
BOOST_FAIL("Expected exception, but none was thrown.");