From bfb6858e555d4bcf97c974a1ccfe7d2584b5044d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 7 Apr 2017 10:32:19 +0200 Subject: [PATCH] sstables: mutation_reader: Let clustering_ranges_walker handle the _fwd_range start Simplifies the code a bit, but also will make it easier to calculate the next position we should skip to after forwarding, taking into consideration both the position forwarded to as well as clustering ranges of the query. That will be just calling _ck_ranges_walker->lower_bound() after it is trimmed. --- sstables/partition.cc | 81 +++++++++++++++++-------------------------- 1 file changed, 32 insertions(+), 49 deletions(-) diff --git a/sstables/partition.cc b/sstables/partition.cc index f8b76d41b2..7499f77169 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -142,17 +142,15 @@ private: stdx::optional _mutation; bool _is_mutation_end = true; - position_range _fwd_range = position_range::full(); // Restricts the stream on top of _ck_ranges. + position_in_partition _fwd_end = position_in_partition::after_all_clustered_rows(); // Restricts the stream on top of _ck_ranges_walker. streamed_mutation::forwarding _fwd; - bool _after_fwd_range_start = false; // Because of #1203 we may encounter sstables with range tombstones // placed earlier than expected. We fix the ordering by loading range tombstones // initially into _range_tombstones, until first row is encountered, // and then merge the two streams in push_ready_fragments(). // - // _range_tombstones holds only tombstones which are inside _ck_ranges and - // after current _fwd_range.start(). + // _range_tombstones holds only tombstones which are relevant for current ranges. range_tombstone_stream _range_tombstones; bool _first_row_encountered = false; public: @@ -313,28 +311,15 @@ private: void advance_to(position_in_partition_view pos) { position_in_partition::less_compare less(*_schema); - auto log = [&] { - sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, pos, _out_of_range, _skip_in_progress); - }; - - if (!_after_fwd_range_start && less(pos, _fwd_range.start())) { - _skip_in_progress = true; - log(); - return; - } - - _after_fwd_range_start = true; - - if (!less(pos, _fwd_range.end())) { + if (!less(pos, _fwd_end)) { _out_of_range = true; _skip_in_progress = false; - log(); - return; + } else { + _skip_in_progress = !_ck_ranges_walker->advance_to(pos); + _out_of_range |= _ck_ranges_walker->out_of_range(); } - _skip_in_progress = !pos.is_static_row() && !_ck_ranges_walker->advance_to(pos); - _out_of_range |= _ck_ranges_walker->out_of_range(); - log(); + sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, pos, _out_of_range, _skip_in_progress); } // Assumes that this and other advance_to() overloads are called with monotonic positions. @@ -343,26 +328,15 @@ private: auto&& start = rt.position(); auto&& end = rt.end_position(); - auto log = [&] { - sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, rt, _out_of_range, _skip_in_progress); - }; - - if (less(end, _fwd_range.start())) { - _skip_in_progress = true; - log(); - return; - } - - if (!less(start, _fwd_range.end())) { + if (!less(start, _fwd_end)) { _out_of_range = true; _skip_in_progress = false; // It may become in range after next forwarding, so cannot drop it - log(); - return; + } else { + _skip_in_progress = !_ck_ranges_walker->advance_to(start, end); + _out_of_range |= _ck_ranges_walker->out_of_range(); } - _skip_in_progress = !_ck_ranges_walker->advance_to(start, end); - _out_of_range |= _ck_ranges_walker->out_of_range(); - log(); + sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, rt, _out_of_range, _skip_in_progress); } void advance_to(const mutation_fragment& mf) { @@ -377,9 +351,8 @@ private: sstlog.trace("mp_row_consumer {}: set_up_ck_ranges({})", this, pk); _ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk); _ck_ranges_walker = clustering_ranges_walker(*_schema, _ck_ranges->ranges()); - _fwd_range = _fwd ? position_range::for_static_row() : position_range::full(); + _fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows(); _out_of_range = false; - _after_fwd_range_start = false; _range_tombstones.reset(); _first_row_encountered = false; } @@ -736,7 +709,7 @@ public: } // Workaround for #1203 if (!_first_row_encountered) { - if (!less(rt_pos, _fwd_range.start()) && _ck_ranges_walker->advance_to(rt_pos, rt.end_position())) { + if (_ck_ranges_walker->advance_to(rt_pos, rt.end_position())) { _range_tombstones.apply(std::move(rt)); } return proceed::yes; @@ -951,7 +924,7 @@ row_consumer::proceed mp_row_consumer::push_ready_fragments_out_of_range() { // Emit all range tombstones relevant to the current forwarding range first. while (!_sm->is_buffer_full()) { - auto mfo = _range_tombstones.get_next(_fwd_range.end()); + auto mfo = _range_tombstones.get_next(_fwd_end); if (!mfo) { _sm->_end_of_stream = true; break; @@ -975,13 +948,23 @@ mp_row_consumer::push_ready_fragments() { } future<> mp_row_consumer::fast_forward_to(position_range r) { - _fwd_range = std::move(r); + sstlog.trace("mp_row_consumer {}: fast_forward_to({})", this, r); _out_of_range = _is_mutation_end; - _after_fwd_range_start = false; + _fwd_end = std::move(r).end(); - _range_tombstones.forward_to(_fwd_range.start()); + _range_tombstones.forward_to(r.start()); - if (_ready && !_ready->relevant_for_range(*_schema, _fwd_range.start())) { + _ck_ranges_walker->trim_front(std::move(r).start()); + if (_ck_ranges_walker->out_of_range()) { + _out_of_range = true; + _ready = {}; + sstlog.trace("mp_row_consumer {}: no more ranges", this); + return make_ready_future<>(); + } + + auto start = _ck_ranges_walker->lower_bound(); + + if (_ready && !_ready->relevant_for_range(*_schema, start)) { _ready = {}; } @@ -989,11 +972,11 @@ future<> mp_row_consumer::fast_forward_to(position_range r) { advance_to(*_in_progress); } - sstlog.trace("mp_row_consumer {}: fast_forward_to({}) => out_of_range={}, skip_in_progress={}", this, _fwd_range, _out_of_range, _skip_in_progress); + sstlog.trace("mp_row_consumer {}: => out_of_range={}, skip_in_progress={}", this, _out_of_range, _skip_in_progress); if (!_in_progress || _skip_in_progress) { - sstlog.trace("mp_row_consumer {}: advance_context({})", this, _fwd_range.start()); - return _sm->advance_context(_fwd_range.start()); + sstlog.trace("mp_row_consumer {}: advance_context({})", this, start); + return _sm->advance_context(start); } return make_ready_future<>();