mutation_partition: fix iterator invalidation in trim_rows

Reversed iterators are adaptors for 'normal' iterators. These underlying
iterators point to different objects that the reversed iterators
themselves.

The consequence of this is that removing an element pointed to by a
reversed iterator may invalidate reversed iterator which point to a
completely different object.

This is what happens in trim_rows for reversed queries. Erasing a row
can invalidate end iterator and the loop would fail to stop.

The solution is to introduce
reversal_traits::erase_dispose_and_update_end() funcion which erases and
disposes object pointed to by a given iterator but takes also a
reference to and end iterator and updates it if necessary to make sure
that it stays valid.

Fixes #1609.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1472080609-11642-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit 6012a7e733)
This commit is contained in:
Paweł Dziepak
2016-08-25 00:16:49 +01:00
committed by Pekka Enberg
parent ec3ace5aa3
commit bbffb51811
2 changed files with 78 additions and 1 deletions

View File

@@ -57,6 +57,14 @@ struct reversal_traits<false> {
return c.erase_and_dispose(begin, end, std::move(disposer));
}
template<typename Container, typename Disposer>
static typename Container::iterator erase_dispose_and_update_end(Container& c,
typename Container::iterator it, Disposer&& disposer,
typename Container::iterator&)
{
return c.erase_and_dispose(it, std::forward<Disposer>(disposer));
}
template <typename Container>
static boost::iterator_range<typename Container::iterator> maybe_reverse(
Container& c, boost::iterator_range<typename Container::iterator> r)
@@ -93,6 +101,24 @@ struct reversal_traits<true> {
);
}
// Erases element pointed to by it and makes sure than iterator end is not
// invalidated.
template<typename Container, typename Disposer>
static typename Container::reverse_iterator erase_dispose_and_update_end(Container& c,
typename Container::reverse_iterator it, Disposer&& disposer,
typename Container::reverse_iterator& end)
{
auto to_erase = std::next(it).base();
bool update_end = end.base() == to_erase;
auto ret = typename Container::reverse_iterator(
c.erase_and_dispose(to_erase, std::forward<Disposer>(disposer))
);
if (update_end) {
end = ret;
}
return ret;
}
template <typename Container>
static boost::iterator_range<typename Container::reverse_iterator> maybe_reverse(
Container& c, boost::iterator_range<typename Container::iterator> r)
@@ -1136,7 +1162,7 @@ void mutation_partition::trim_rows(const schema& s,
}
if (e.empty()) {
last = reversal_traits<reversed>::erase_and_dispose(_rows, last, std::next(last, 1), deleter);
last = reversal_traits<reversed>::erase_dispose_and_update_end(_rows, last, deleter, end);
} else {
++last;
}

View File

@@ -1391,3 +1391,54 @@ SEASTAR_TEST_CASE(test_slicing_mutation) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_trim_rows) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.build();
auto pk = partition_key::from_exploded(*s, { int32_type->decompose(0) });
mutation m(pk, s);
constexpr auto row_count = 8;
for (auto i = 0; i < row_count; i++) {
m.set_clustered_cell(clustering_key_prefix::from_single_value(*s, int32_type->decompose(i)),
to_bytes("v"), data_value(i), api::new_timestamp() - 5);
}
m.partition().apply(tombstone(api::new_timestamp(), gc_clock::now()));
auto now = gc_clock::now() + gc_clock::duration(std::chrono::hours(1));
auto compact_and_expect_empty = [&] (mutation m, std::vector<query::clustering_range> ranges) {
mutation m2 = m;
m.partition().compact_for_query(*s, now, ranges, false, query::max_rows);
BOOST_REQUIRE(m.partition().clustered_rows().empty());
std::reverse(ranges.begin(), ranges.end());
m2.partition().compact_for_query(*s, now, ranges, true, query::max_rows);
BOOST_REQUIRE(m2.partition().clustered_rows().empty());
};
std::vector<query::clustering_range> ranges = {
query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)))
};
compact_and_expect_empty(m, ranges);
ranges = {
query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(50)))
};
compact_and_expect_empty(m, ranges);
ranges = {
query::clustering_range::make_ending_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)))
};
compact_and_expect_empty(m, ranges);
ranges = {
query::clustering_range::make_open_ended_both_sides()
};
compact_and_expect_empty(m, ranges);
});
}