sstables: protect against duplicated range tombstones

Promoted index may cause sstable to have range tombstones duplicated
several times. These duplicates appear in the "wrong" place since they
are smaller than the entity preceeding them.

This patch ignores such duplicates by skipping range tombstones that are
smaller than previously read ones.

Moreover, these duplicted range tombstone may appear in the middle of
clustering row, so the sstable reader has also gained the ability to
merge parts of the row in such cases.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
Paweł Dziepak
2016-07-19 14:56:29 +01:00
parent 50469e5ef3
commit 08032db269
Notes: Pekka Enberg 2016-07-27 14:05:58 +03:00
backport: 1.3

View File

@@ -577,24 +577,32 @@ public:
};
class sstable_streamed_mutation : public streamed_mutation::impl {
const schema& _schema;
data_consume_context& _context;
mp_row_consumer& _consumer;
tombstone _t;
bool _finished = false;
range_tombstone_stream _range_tombstones;
mutation_fragment_opt _current_candidate;
mutation_fragment_opt _next_candidate;
stdx::optional<position_in_partition> _last_position;
position_in_partition::less_compare _cmp;
position_in_partition::equal_compare _eq;
private:
future<mutation_fragment_opt> read_next() {
// Because of #1203 we may encounter sstables with range tombstones
// placed earler than expected.
if (_next_candidate) {
auto mf = _range_tombstones.get_next(*_next_candidate);
if (_next_candidate || (_current_candidate && _finished)) {
assert(_current_candidate);
auto mf = _range_tombstones.get_next(*_current_candidate);
if (!mf) {
mf = move_and_disengage(_next_candidate);
mf = move_and_disengage(_current_candidate);
_current_candidate = move_and_disengage(_next_candidate);
}
return make_ready_future<mutation_fragment_opt>(std::move(mf));
}
if (_finished) {
// No need to update _last_position here. We've already read everything from the sstable.
return make_ready_future<mutation_fragment_opt>(_range_tombstones.get_next());
}
return _context.read().then([this] {
@@ -602,17 +610,35 @@ private:
_finished = true;
}
auto mf = _consumer.get_mutation_fragment();
if (mf && mf->is_range_tombstone()) {
_range_tombstones.apply(std::move(mf->as_range_tombstone()));
} else {
_next_candidate = std::move(mf);
if (mf) {
if (mf->is_range_tombstone()) {
// If sstable uses promoted index it will repeat relevant range tombstones in
// each block. Do not emit these duplicates as they will break the guarantee
// that mutation fragment are produced in ascending order.
if (!_last_position || !_cmp(*mf, *_last_position)) {
_last_position = mf->position();
_range_tombstones.apply(std::move(mf->as_range_tombstone()));
}
} else {
// mp_row_consumer may produce mutation_fragments in parts if they are
// interrupted by range tombstone duplicate. Make sure they are merged
// before emitting them.
_last_position = mf->position();
if (!_current_candidate) {
_current_candidate = std::move(mf);
} else if (_current_candidate && _eq(*_current_candidate, *mf)) {
_current_candidate->apply(_schema, std::move(*mf));
} else {
_next_candidate = std::move(mf);
}
}
}
return read_next();
});
}
public:
sstable_streamed_mutation(schema_ptr s, dht::decorated_key dk, data_consume_context& context, mp_row_consumer& consumer, tombstone t)
: streamed_mutation::impl(s, std::move(dk), t), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s) { }
: streamed_mutation::impl(s, std::move(dk), t), _schema(*s), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s), _cmp(*s), _eq(*s) { }
virtual future<> fill_buffer() final override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {