From 08032db269cc93ef3aa683e96f99f58cb9d9da03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 19 Jul 2016 14:56:29 +0100 Subject: [PATCH] sstables: protect against duplicated range tombstones MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Promoted index may cause sstable to have range tombstones duplicated several times. These duplicates appear in the "wrong" place since they are smaller than the entity preceeding them. This patch ignores such duplicates by skipping range tombstones that are smaller than previously read ones. Moreover, these duplicted range tombstone may appear in the middle of clustering row, so the sstable reader has also gained the ability to merge parts of the row in such cases. Signed-off-by: Paweł Dziepak --- sstables/partition.cc | 42 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/sstables/partition.cc b/sstables/partition.cc index 119ca4d0f3..7edd4536ad 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -577,24 +577,32 @@ public: }; class sstable_streamed_mutation : public streamed_mutation::impl { + const schema& _schema; data_consume_context& _context; mp_row_consumer& _consumer; tombstone _t; bool _finished = false; range_tombstone_stream _range_tombstones; + mutation_fragment_opt _current_candidate; mutation_fragment_opt _next_candidate; + stdx::optional _last_position; + position_in_partition::less_compare _cmp; + position_in_partition::equal_compare _eq; private: future read_next() { // Because of #1203 we may encounter sstables with range tombstones // placed earler than expected. - if (_next_candidate) { - auto mf = _range_tombstones.get_next(*_next_candidate); + if (_next_candidate || (_current_candidate && _finished)) { + assert(_current_candidate); + auto mf = _range_tombstones.get_next(*_current_candidate); if (!mf) { - mf = move_and_disengage(_next_candidate); + mf = move_and_disengage(_current_candidate); + _current_candidate = move_and_disengage(_next_candidate); } return make_ready_future(std::move(mf)); } if (_finished) { + // No need to update _last_position here. We've already read everything from the sstable. return make_ready_future(_range_tombstones.get_next()); } return _context.read().then([this] { @@ -602,17 +610,35 @@ private: _finished = true; } auto mf = _consumer.get_mutation_fragment(); - if (mf && mf->is_range_tombstone()) { - _range_tombstones.apply(std::move(mf->as_range_tombstone())); - } else { - _next_candidate = std::move(mf); + if (mf) { + if (mf->is_range_tombstone()) { + // If sstable uses promoted index it will repeat relevant range tombstones in + // each block. Do not emit these duplicates as they will break the guarantee + // that mutation fragment are produced in ascending order. + if (!_last_position || !_cmp(*mf, *_last_position)) { + _last_position = mf->position(); + _range_tombstones.apply(std::move(mf->as_range_tombstone())); + } + } else { + // mp_row_consumer may produce mutation_fragments in parts if they are + // interrupted by range tombstone duplicate. Make sure they are merged + // before emitting them. + _last_position = mf->position(); + if (!_current_candidate) { + _current_candidate = std::move(mf); + } else if (_current_candidate && _eq(*_current_candidate, *mf)) { + _current_candidate->apply(_schema, std::move(*mf)); + } else { + _next_candidate = std::move(mf); + } + } } return read_next(); }); } public: sstable_streamed_mutation(schema_ptr s, dht::decorated_key dk, data_consume_context& context, mp_row_consumer& consumer, tombstone t) - : streamed_mutation::impl(s, std::move(dk), t), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s) { } + : streamed_mutation::impl(s, std::move(dk), t), _schema(*s), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s), _cmp(*s), _eq(*s) { } virtual future<> fill_buffer() final override { return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {