diff --git a/mutation_partition.cc b/mutation_partition.cc index 56caf767ed..7a6f5a19e6 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -57,6 +57,14 @@ struct reversal_traits { return c.erase_and_dispose(begin, end, std::move(disposer)); } + template + static typename Container::iterator erase_dispose_and_update_end(Container& c, + typename Container::iterator it, Disposer&& disposer, + typename Container::iterator&) + { + return c.erase_and_dispose(it, std::forward(disposer)); + } + template static boost::iterator_range maybe_reverse( Container& c, boost::iterator_range r) @@ -93,6 +101,24 @@ struct reversal_traits { ); } + // Erases element pointed to by it and makes sure than iterator end is not + // invalidated. + template + static typename Container::reverse_iterator erase_dispose_and_update_end(Container& c, + typename Container::reverse_iterator it, Disposer&& disposer, + typename Container::reverse_iterator& end) + { + auto to_erase = std::next(it).base(); + bool update_end = end.base() == to_erase; + auto ret = typename Container::reverse_iterator( + c.erase_and_dispose(to_erase, std::forward(disposer)) + ); + if (update_end) { + end = ret; + } + return ret; + } + template static boost::iterator_range maybe_reverse( Container& c, boost::iterator_range r) @@ -1136,7 +1162,7 @@ void mutation_partition::trim_rows(const schema& s, } if (e.empty()) { - last = reversal_traits::erase_and_dispose(_rows, last, std::next(last, 1), deleter); + last = reversal_traits::erase_dispose_and_update_end(_rows, last, deleter, end); } else { ++last; } diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc index e408e819ac..47e45eff22 100644 --- a/tests/mutation_test.cc +++ b/tests/mutation_test.cc @@ -1391,3 +1391,54 @@ SEASTAR_TEST_CASE(test_slicing_mutation) { return make_ready_future<>(); } + +SEASTAR_TEST_CASE(test_trim_rows) { + return seastar::async([] { + auto s = schema_builder("ks", "cf") + .with_column("pk", int32_type, column_kind::partition_key) + .with_column("ck", int32_type, column_kind::clustering_key) + .with_column("v", int32_type) + .build(); + + auto pk = partition_key::from_exploded(*s, { int32_type->decompose(0) }); + mutation m(pk, s); + constexpr auto row_count = 8; + for (auto i = 0; i < row_count; i++) { + m.set_clustered_cell(clustering_key_prefix::from_single_value(*s, int32_type->decompose(i)), + to_bytes("v"), data_value(i), api::new_timestamp() - 5); + } + m.partition().apply(tombstone(api::new_timestamp(), gc_clock::now())); + + auto now = gc_clock::now() + gc_clock::duration(std::chrono::hours(1)); + + auto compact_and_expect_empty = [&] (mutation m, std::vector ranges) { + mutation m2 = m; + m.partition().compact_for_query(*s, now, ranges, false, query::max_rows); + BOOST_REQUIRE(m.partition().clustered_rows().empty()); + + std::reverse(ranges.begin(), ranges.end()); + m2.partition().compact_for_query(*s, now, ranges, true, query::max_rows); + BOOST_REQUIRE(m2.partition().clustered_rows().empty()); + }; + + std::vector ranges = { + query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5))) + }; + compact_and_expect_empty(m, ranges); + + ranges = { + query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(50))) + }; + compact_and_expect_empty(m, ranges); + + ranges = { + query::clustering_range::make_ending_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5))) + }; + compact_and_expect_empty(m, ranges); + + ranges = { + query::clustering_range::make_open_ended_both_sides() + }; + compact_and_expect_empty(m, ranges); + }); +} \ No newline at end of file