diff --git a/sstables/partition.cc b/sstables/partition.cc index 240b8b157b..94ebc961b3 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -123,6 +123,9 @@ private: // When set, the fragment pending in _in_progress should not be emitted. bool _skip_in_progress = false; + // The value of _ck_ranges->lower_bound_counter() last time we tried to skip to _ck_ranges->lower_bound(). + size_t _last_lower_bound_counter = 0; + // We don't have "end of clustering row" markers. So we know that the current // row has ended once we get something (e.g. a live cell) that belongs to another // one. If that happens sstable reader is interrupted (proceed::no) but we @@ -351,6 +354,7 @@ private: sstlog.trace("mp_row_consumer {}: set_up_ck_ranges({})", this, pk); _ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk); _ck_ranges_walker = clustering_ranges_walker(*_schema, _ck_ranges->ranges()); + _last_lower_bound_counter = 0; _fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows(); _out_of_range = false; _range_tombstones.reset(); @@ -433,10 +437,13 @@ public: ret = flush(); } advance_to(rt); + _in_progress = mutation_fragment(std::move(rt)); if (_out_of_range) { ret = push_ready_fragments_out_of_range(); } - _in_progress = mutation_fragment(std::move(rt)); + if (needs_skip()) { + ret = proceed::no; + } return ret; } @@ -458,14 +465,17 @@ public: } if (!_in_progress) { advance_to(pos); - if (_out_of_range) { - ret = push_ready_fragments_out_of_range(); - } if (is_static) { _in_progress = mutation_fragment(static_row()); } else { _in_progress = mutation_fragment(clustering_row(std::move(pos.key()))); } + if (_out_of_range) { + ret = push_ready_fragments_out_of_range(); + } + if (needs_skip()) { + ret = proceed::no; + } } return ret; } @@ -786,6 +796,15 @@ public: // must be after it. // future<> fast_forward_to(position_range); + + bool needs_skip() const { + return (_skip_in_progress || !_in_progress) + && _last_lower_bound_counter != _ck_ranges_walker->lower_bound_change_counter(); + } + + // Tries to fast forward the consuming context to the next position. + // Must be called outside consuming context. + future<> maybe_skip(); }; struct sstable_data_source { @@ -861,7 +880,9 @@ public: if (is_buffer_full() || is_end_of_stream()) { return make_ready_future<>(); } - return _ds->_context.read(); + return _ds->_consumer.maybe_skip().then([this] { + return _ds->_context.read(); + }); } future<> fast_forward_to(position_range range) override { @@ -989,9 +1010,20 @@ future<> mp_row_consumer::fast_forward_to(position_range r) { } sstlog.trace("mp_row_consumer {}: advance_context({})", this, start); + _last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter(); return _sm->advance_context(start); } +future<> mp_row_consumer::maybe_skip() { + if (!needs_skip()) { + return make_ready_future<>(); + } + _last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter(); + auto pos = _ck_ranges_walker->lower_bound(); + sstlog.trace("mp_row_consumer {}: advance_context({})", this, pos); + return _sm->advance_context(pos); +} + static int adjust_binary_search_index(int idx) { if (idx < 0) { // binary search gives us the first index _greater_ than the key searched for,