partition_snapshot_reader: don't re-emit range tombstones overlapping multiple ck ranges
When entering a new ck range (of the partition-slice), the partition
snapshot reader will apply to its range tombstones stream all the
tombstones that are relevant to the new ck range. When the partition has
range tombstones that overlap with multiple ck ranges, these will be
applied to the range tombstone stream when entering any of the ck ranges
they overlap with. This will result in the violation of the monotonicity
of the mutation fragments emitted by the reader, as these range
tombstones will be re-emitted on each ck range, if the ck range has at
least one clustering row they apply to.
For example, given the following partition:
rt{[1,10]}, cr{1}, cr{2}, cr{3}...
And a partition-slice with the following ck ranges:
[1,2], [3, 4]
The reader will emit the following fragment stream:
rt{[1,10]}, cr{1}, cr{2}, rt{[1,10]}, cr{3}, ...
Note how the range tombstone is emitted twice. In addition to violating
the monotonicity guarantee, this can also result in an explosion of the
number of emitted range tombstones.
Fix by applying only those range tombstones to the range tombstone
stream, that have a position strictly greater than that of the last
emitted clustering row (or range tombstone), when entering a new ck
range.
Fixes: #4104
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <e047af76df75972acb3c32c7ef9bb5d65d804c82.1547916701.git.bdenes@scylladb.com>
This commit is contained in:
@@ -268,6 +268,14 @@ void range_tombstone_stream::apply(const range_tombstone_list& list, const query
|
||||
_list.apply(_schema, rt);
|
||||
}
|
||||
}
|
||||
void range_tombstone_stream::apply(const range_tombstone_list& list, const query::clustering_range& range, position_in_partition_view last_pos) {
|
||||
auto tri_cmp = position_in_partition::tri_compare(_schema);
|
||||
for (const range_tombstone& rt : list.slice(_schema, range)) {
|
||||
if (tri_cmp(rt.position(), last_pos) > 0) {
|
||||
_list.apply(_schema, rt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void range_tombstone_stream::reset() {
|
||||
_list.clear();
|
||||
|
||||
@@ -620,7 +620,13 @@ public:
|
||||
void apply(const range_tombstone_list& list) {
|
||||
_list.apply(_schema, list);
|
||||
}
|
||||
// Apply those range tombstones from the list, that overlap with the
|
||||
// range.
|
||||
void apply(const range_tombstone_list&, const query::clustering_range&);
|
||||
// Apply those range tombstones from the list, that overlap with the
|
||||
// range *and* have a start position that is strictly greater than
|
||||
// the position.
|
||||
void apply(const range_tombstone_list&, const query::clustering_range&, position_in_partition_view);
|
||||
void reset();
|
||||
bool empty() const;
|
||||
friend std::ostream& operator<<(std::ostream& out, const range_tombstone_stream&);
|
||||
|
||||
@@ -78,22 +78,23 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
});
|
||||
}
|
||||
void refresh_state(const query::clustering_range& ck_range,
|
||||
const std::optional<position_in_partition>& last_row,
|
||||
range_tombstone_stream& range_tombstones) {
|
||||
const position_in_partition& last_row,
|
||||
range_tombstone_stream& range_tombstones,
|
||||
bool new_range) {
|
||||
_clustering_rows.clear();
|
||||
|
||||
if (!last_row) {
|
||||
if (new_range) {
|
||||
// New range. Collect all relevant range tombstone.
|
||||
for (auto&& v : _snapshot->versions()) {
|
||||
range_tombstones.apply(v.partition().row_tombstones(), ck_range);
|
||||
range_tombstones.apply(v.partition().row_tombstones(), ck_range, last_row);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto&& v : _snapshot->versions()) {
|
||||
auto cr_end = v.partition().upper_bound(_schema, ck_range);
|
||||
auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
|
||||
if (last_row) {
|
||||
return v.partition().clustered_rows().upper_bound(*last_row, _cmp);
|
||||
if (!new_range) {
|
||||
return v.partition().clustered_rows().upper_bound(last_row, _cmp);
|
||||
} else {
|
||||
return v.partition().lower_bound(_schema, ck_range);
|
||||
}
|
||||
@@ -157,19 +158,22 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
}
|
||||
|
||||
// Returns next clustered row in the range.
|
||||
// If the ck_range is the same as the one used previously last_row needs
|
||||
// to be engaged and equal the position of the row returned last time.
|
||||
// If the ck_range is different or this is the first call to this
|
||||
// function last_row has to be disengaged. Additionally, when entering
|
||||
// new range range_tombstones will be populated with all relevant
|
||||
// tombstones.
|
||||
// `last_row` should be the position of the last emitted row or
|
||||
// range tombstone.
|
||||
// `new_range` should be set when the read entered a new ck
|
||||
// range. When entering new range, range_tombstones will be populated
|
||||
// with all relevant tombstones. Tombstones, whose start position is
|
||||
// not strictly greater than `last_row` will not be applied to
|
||||
// `range_tombstones`. This is to ensure tombstones that span
|
||||
// more than one ck range are emitted just once.
|
||||
mutation_fragment_opt next_row(const query::clustering_range& ck_range,
|
||||
const std::optional<position_in_partition>& last_row,
|
||||
range_tombstone_stream& range_tombstones) {
|
||||
const position_in_partition& last_row,
|
||||
range_tombstone_stream& range_tombstones,
|
||||
bool new_range) {
|
||||
return in_alloc_section([&] () -> mutation_fragment_opt {
|
||||
auto mark = _snapshot->get_change_mark();
|
||||
if (!last_row || mark != _change_mark) {
|
||||
refresh_state(ck_range, last_row, range_tombstones);
|
||||
if (new_range || mark != _change_mark) {
|
||||
refresh_state(ck_range, last_row, range_tombstones, new_range);
|
||||
_change_mark = mark;
|
||||
}
|
||||
while (has_more_rows()) {
|
||||
@@ -203,9 +207,10 @@ private:
|
||||
query::clustering_row_ranges::const_iterator _current_ck_range;
|
||||
query::clustering_row_ranges::const_iterator _ck_range_end;
|
||||
|
||||
std::optional<position_in_partition> _last_entry;
|
||||
position_in_partition _last_entry;
|
||||
mutation_fragment_opt _next_row;
|
||||
range_tombstone_stream _range_tombstones;
|
||||
bool _new_range = true;
|
||||
|
||||
lsa_partition_reader _reader;
|
||||
bool _no_more_rows_in_current_range = false;
|
||||
@@ -223,7 +228,7 @@ private:
|
||||
|
||||
mutation_fragment_opt read_next() {
|
||||
if (!_next_row && !_no_more_rows_in_current_range) {
|
||||
_next_row = _reader.next_row(*_current_ck_range, _last_entry, _range_tombstones);
|
||||
_next_row = _reader.next_row(*_current_ck_range, _last_entry, _range_tombstones, std::exchange(_new_range, false));
|
||||
}
|
||||
if (_next_row) {
|
||||
auto pos_view = _next_row->as_clustering_row().position();
|
||||
@@ -258,7 +263,7 @@ private:
|
||||
if (mfopt) {
|
||||
emplace_mutation_fragment(std::move(*mfopt));
|
||||
} else {
|
||||
_last_entry = std::nullopt;
|
||||
_new_range = true;
|
||||
_current_ck_range = std::next(_current_ck_range);
|
||||
on_new_range();
|
||||
}
|
||||
@@ -276,6 +281,7 @@ public:
|
||||
, _ck_ranges(std::move(crr))
|
||||
, _current_ck_range(_ck_ranges.begin())
|
||||
, _ck_range_end(_ck_ranges.end())
|
||||
, _last_entry(position_in_partition::before_all_clustered_rows())
|
||||
, _range_tombstones(*_schema)
|
||||
, _reader(*_schema, std::move(snp), region, read_section, digest_requested)
|
||||
{
|
||||
|
||||
@@ -304,6 +304,11 @@ static void test_slicing_and_fast_forwarding(populate_fn populate) {
|
||||
.build();
|
||||
|
||||
test_common(slice);
|
||||
|
||||
BOOST_TEST_MESSAGE("Test monotonic positions");
|
||||
auto mr = ms.make_reader(s.schema(), query::full_partition_range, slice,
|
||||
default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr);
|
||||
assert_that(std::move(mr)).has_monotonic_positions();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user