tests: Make sure range tombstone is properly split over rows with non-full keys.

Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
Vladimir Krivopalov
2018-09-26 17:19:43 -07:00
parent fbccae0d15
commit 26d4d276e9

View File

@@ -32,7 +32,6 @@ future<> normalizing_reader::fill_buffer(db::timeout_clock::time_point timeout)
return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this, timeout] {
return _rd.fill_buffer(timeout).then([this] {
position_in_partition::less_compare less{*_rd.schema()};
position_in_partition::equal_compare eq{*_rd.schema()};
while (!_rd.is_buffer_empty()) {
auto mf = _rd.pop_mutation_fragment();
if (mf.is_end_of_partition()) {
@@ -43,23 +42,16 @@ future<> normalizing_reader::fill_buffer(db::timeout_clock::time_point timeout)
_range_tombstones.apply(std::move(mf).as_range_tombstone());
continue;
} else if (mf.is_clustering_row()) {
const clustering_row& cr = mf.as_clustering_row();
auto ck = cr.key();
auto end_kind = clustering_key::make_full(*_schema, ck) ? bound_kind::excl_end : bound_kind::incl_end;
auto ck = mf.as_clustering_row().key();
clustering_key::make_full(*_rd.schema(), ck);
auto after_pos = position_in_partition::after_key(ck);
while (auto mfo = _range_tombstones.get_next(mf)) {
range_tombstone&& rt = std::move(*mfo).as_range_tombstone();
if (less(rt.end_position(), cr.position())
|| eq(rt.end_position(), position_in_partition::after_key(ck))) {
if (!less(after_pos, rt.end_position())) {
push_mutation_fragment(std::move(rt));
} else {
push_mutation_fragment(range_tombstone{
rt.start_bound(),
bound_view{ck, end_kind},
rt.tomb});
rt.trim_front(*_rd.schema(), position_in_partition::after_key(ck));
_range_tombstones.apply(std::move(rt));
break;
push_mutation_fragment(range_tombstone{rt.position(), after_pos, rt.tomb});
_range_tombstones.apply(range_tombstone{after_pos, rt.end_position(), rt.tomb});
}
}
}