diff --git a/sstables/partition.cc b/sstables/partition.cc index f8b76d41b2..7499f77169 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -142,17 +142,15 @@ private: stdx::optional _mutation; bool _is_mutation_end = true; - position_range _fwd_range = position_range::full(); // Restricts the stream on top of _ck_ranges. + position_in_partition _fwd_end = position_in_partition::after_all_clustered_rows(); // Restricts the stream on top of _ck_ranges_walker. streamed_mutation::forwarding _fwd; - bool _after_fwd_range_start = false; // Because of #1203 we may encounter sstables with range tombstones // placed earlier than expected. We fix the ordering by loading range tombstones // initially into _range_tombstones, until first row is encountered, // and then merge the two streams in push_ready_fragments(). // - // _range_tombstones holds only tombstones which are inside _ck_ranges and - // after current _fwd_range.start(). + // _range_tombstones holds only tombstones which are relevant for current ranges. range_tombstone_stream _range_tombstones; bool _first_row_encountered = false; public: @@ -313,28 +311,15 @@ private: void advance_to(position_in_partition_view pos) { position_in_partition::less_compare less(*_schema); - auto log = [&] { - sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, pos, _out_of_range, _skip_in_progress); - }; - - if (!_after_fwd_range_start && less(pos, _fwd_range.start())) { - _skip_in_progress = true; - log(); - return; - } - - _after_fwd_range_start = true; - - if (!less(pos, _fwd_range.end())) { + if (!less(pos, _fwd_end)) { _out_of_range = true; _skip_in_progress = false; - log(); - return; + } else { + _skip_in_progress = !_ck_ranges_walker->advance_to(pos); + _out_of_range |= _ck_ranges_walker->out_of_range(); } - _skip_in_progress = !pos.is_static_row() && !_ck_ranges_walker->advance_to(pos); - _out_of_range |= _ck_ranges_walker->out_of_range(); - log(); + sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, pos, _out_of_range, _skip_in_progress); } // Assumes that this and other advance_to() overloads are called with monotonic positions. @@ -343,26 +328,15 @@ private: auto&& start = rt.position(); auto&& end = rt.end_position(); - auto log = [&] { - sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, rt, _out_of_range, _skip_in_progress); - }; - - if (less(end, _fwd_range.start())) { - _skip_in_progress = true; - log(); - return; - } - - if (!less(start, _fwd_range.end())) { + if (!less(start, _fwd_end)) { _out_of_range = true; _skip_in_progress = false; // It may become in range after next forwarding, so cannot drop it - log(); - return; + } else { + _skip_in_progress = !_ck_ranges_walker->advance_to(start, end); + _out_of_range |= _ck_ranges_walker->out_of_range(); } - _skip_in_progress = !_ck_ranges_walker->advance_to(start, end); - _out_of_range |= _ck_ranges_walker->out_of_range(); - log(); + sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, rt, _out_of_range, _skip_in_progress); } void advance_to(const mutation_fragment& mf) { @@ -377,9 +351,8 @@ private: sstlog.trace("mp_row_consumer {}: set_up_ck_ranges({})", this, pk); _ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk); _ck_ranges_walker = clustering_ranges_walker(*_schema, _ck_ranges->ranges()); - _fwd_range = _fwd ? position_range::for_static_row() : position_range::full(); + _fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows(); _out_of_range = false; - _after_fwd_range_start = false; _range_tombstones.reset(); _first_row_encountered = false; } @@ -736,7 +709,7 @@ public: } // Workaround for #1203 if (!_first_row_encountered) { - if (!less(rt_pos, _fwd_range.start()) && _ck_ranges_walker->advance_to(rt_pos, rt.end_position())) { + if (_ck_ranges_walker->advance_to(rt_pos, rt.end_position())) { _range_tombstones.apply(std::move(rt)); } return proceed::yes; @@ -951,7 +924,7 @@ row_consumer::proceed mp_row_consumer::push_ready_fragments_out_of_range() { // Emit all range tombstones relevant to the current forwarding range first. while (!_sm->is_buffer_full()) { - auto mfo = _range_tombstones.get_next(_fwd_range.end()); + auto mfo = _range_tombstones.get_next(_fwd_end); if (!mfo) { _sm->_end_of_stream = true; break; @@ -975,13 +948,23 @@ mp_row_consumer::push_ready_fragments() { } future<> mp_row_consumer::fast_forward_to(position_range r) { - _fwd_range = std::move(r); + sstlog.trace("mp_row_consumer {}: fast_forward_to({})", this, r); _out_of_range = _is_mutation_end; - _after_fwd_range_start = false; + _fwd_end = std::move(r).end(); - _range_tombstones.forward_to(_fwd_range.start()); + _range_tombstones.forward_to(r.start()); - if (_ready && !_ready->relevant_for_range(*_schema, _fwd_range.start())) { + _ck_ranges_walker->trim_front(std::move(r).start()); + if (_ck_ranges_walker->out_of_range()) { + _out_of_range = true; + _ready = {}; + sstlog.trace("mp_row_consumer {}: no more ranges", this); + return make_ready_future<>(); + } + + auto start = _ck_ranges_walker->lower_bound(); + + if (_ready && !_ready->relevant_for_range(*_schema, start)) { _ready = {}; } @@ -989,11 +972,11 @@ future<> mp_row_consumer::fast_forward_to(position_range r) { advance_to(*_in_progress); } - sstlog.trace("mp_row_consumer {}: fast_forward_to({}) => out_of_range={}, skip_in_progress={}", this, _fwd_range, _out_of_range, _skip_in_progress); + sstlog.trace("mp_row_consumer {}: => out_of_range={}, skip_in_progress={}", this, _out_of_range, _skip_in_progress); if (!_in_progress || _skip_in_progress) { - sstlog.trace("mp_row_consumer {}: advance_context({})", this, _fwd_range.start()); - return _sm->advance_context(_fwd_range.start()); + sstlog.trace("mp_row_consumer {}: advance_context({})", this, start); + return _sm->advance_context(start); } return make_ready_future<>();