diff --git a/sstables/partition.cc b/sstables/partition.cc index 119ca4d0f3..7edd4536ad 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -577,24 +577,32 @@ public: }; class sstable_streamed_mutation : public streamed_mutation::impl { + const schema& _schema; data_consume_context& _context; mp_row_consumer& _consumer; tombstone _t; bool _finished = false; range_tombstone_stream _range_tombstones; + mutation_fragment_opt _current_candidate; mutation_fragment_opt _next_candidate; + stdx::optional _last_position; + position_in_partition::less_compare _cmp; + position_in_partition::equal_compare _eq; private: future read_next() { // Because of #1203 we may encounter sstables with range tombstones // placed earler than expected. - if (_next_candidate) { - auto mf = _range_tombstones.get_next(*_next_candidate); + if (_next_candidate || (_current_candidate && _finished)) { + assert(_current_candidate); + auto mf = _range_tombstones.get_next(*_current_candidate); if (!mf) { - mf = move_and_disengage(_next_candidate); + mf = move_and_disengage(_current_candidate); + _current_candidate = move_and_disengage(_next_candidate); } return make_ready_future(std::move(mf)); } if (_finished) { + // No need to update _last_position here. We've already read everything from the sstable. return make_ready_future(_range_tombstones.get_next()); } return _context.read().then([this] { @@ -602,17 +610,35 @@ private: _finished = true; } auto mf = _consumer.get_mutation_fragment(); - if (mf && mf->is_range_tombstone()) { - _range_tombstones.apply(std::move(mf->as_range_tombstone())); - } else { - _next_candidate = std::move(mf); + if (mf) { + if (mf->is_range_tombstone()) { + // If sstable uses promoted index it will repeat relevant range tombstones in + // each block. Do not emit these duplicates as they will break the guarantee + // that mutation fragment are produced in ascending order. + if (!_last_position || !_cmp(*mf, *_last_position)) { + _last_position = mf->position(); + _range_tombstones.apply(std::move(mf->as_range_tombstone())); + } + } else { + // mp_row_consumer may produce mutation_fragments in parts if they are + // interrupted by range tombstone duplicate. Make sure they are merged + // before emitting them. + _last_position = mf->position(); + if (!_current_candidate) { + _current_candidate = std::move(mf); + } else if (_current_candidate && _eq(*_current_candidate, *mf)) { + _current_candidate->apply(_schema, std::move(*mf)); + } else { + _next_candidate = std::move(mf); + } + } } return read_next(); }); } public: sstable_streamed_mutation(schema_ptr s, dht::decorated_key dk, data_consume_context& context, mp_row_consumer& consumer, tombstone t) - : streamed_mutation::impl(s, std::move(dk), t), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s) { } + : streamed_mutation::impl(s, std::move(dk), t), _schema(*s), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s), _cmp(*s), _eq(*s) { } virtual future<> fill_buffer() final override { return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {