diff --git a/mutation_partition.cc b/mutation_partition.cc index 8a37b3fd81..76c1a45c8f 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -305,14 +305,104 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation _static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row)); _static_row_continuous |= p._static_row_continuous; + rows_entry::tri_compare cmp(s); + auto del = current_deleter(); + + // Compacts rows in [i, end) with the tombstone. + // Erases entries which are left empty by compaction. + // Does not affect continuity. + auto apply_tombstone_to_rows = [&] (apply_resume::stage stage, tombstone tomb, rows_type::iterator i, rows_type::iterator end) -> stop_iteration { + if (!preemptible) { + // Compaction is attempted only in preemptible contexts because it can be expensive to perform and is not + // necessary for correctness. + return stop_iteration::yes; + } + + while (i != end) { + rows_entry& e = *i; + can_gc_fn never_gc = [](tombstone) { return false; }; + + bool all_dead = e.dummy() || !e.row().compact_and_expire(s, + tomb, + gc_clock::time_point::min(), // no TTL expiration + never_gc, // no GC + gc_clock::time_point::min()); // no GC + + auto next_i = std::next(i); + bool inside_continuous_range = !tracker || + (e.continuous() && (next_i != _rows.end() && next_i->continuous())); + + if (all_dead && e.row().empty() && inside_continuous_range) { + i = _rows.erase(i); + if (tracker) { + tracker->on_remove(); + } + del(&e); + } else { + i = next_i; + } + + if (need_preempt() && i != end) { + res = apply_resume(stage, i->position()); + return stop_iteration::no; + } + } + return stop_iteration::yes; + }; + + if (res._stage <= apply_resume::stage::range_tombstone_compaction) { + bool filtering_tombstones = res._stage == apply_resume::stage::range_tombstone_compaction; + for (const range_tombstone_entry& rt : p._row_tombstones) { + position_in_partition_view pos = rt.position(); + if (filtering_tombstones) { + if (cmp(res._pos, rt.end_position()) >= 0) { + continue; + } + filtering_tombstones = false; + if (cmp(res._pos, rt.position()) > 0) { + pos = res._pos; + } + } + auto i = _rows.lower_bound(pos, cmp); + if (i == _rows.end()) { + break; + } + auto end = _rows.lower_bound(rt.end_position(), cmp); + + auto tomb = _tombstone; + tomb.apply(rt.tombstone().tomb); + + if (apply_tombstone_to_rows(apply_resume::stage::range_tombstone_compaction, tomb, i, end) == stop_iteration::no) { + return stop_iteration::no; + } + } + } + if (_row_tombstones.apply_monotonically(s, std::move(p._row_tombstones), preemptible) == stop_iteration::no) { app_stats.has_any_tombstones |= !_row_tombstones.empty(); + res = apply_resume::merging_range_tombstones(); return stop_iteration::no; } app_stats.has_any_tombstones |= !_row_tombstones.empty(); - rows_entry::tri_compare cmp(s); - auto del = current_deleter(); + if (p._tombstone) { + // p._tombstone is already applied to _tombstone + rows_type::iterator i; + if (res._stage == apply_resume::stage::partition_tombstone_compaction) { + i = _rows.lower_bound(res._pos, cmp); + } else { + i = _rows.begin(); + } + if (apply_tombstone_to_rows(apply_resume::stage::partition_tombstone_compaction, + _tombstone, i, _rows.end()) == stop_iteration::no) { + return stop_iteration::no; + } + // TODO: Drop redundant range tombstones + p._tombstone = {}; + } + + res = apply_resume::merging_rows(); + auto p_i = p._rows.begin(); auto i = _rows.begin(); while (p_i != p._rows.end()) { diff --git a/mutation_partition.hh b/mutation_partition.hh index a69a9ebfbc..0d20efaba7 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -1046,6 +1046,10 @@ struct mutation_application_stats { struct apply_resume { enum class stage { start, + range_tombstone_compaction, + merging_range_tombstones, + partition_tombstone_compaction, + merging_rows, done }; @@ -1082,6 +1086,14 @@ struct apply_resume { operator bool() const { return _stage != stage::done; } + static apply_resume merging_rows() { + return {stage::merging_rows, position_in_partition::for_partition_start()}; + } + + static apply_resume merging_range_tombstones() { + return {stage::merging_range_tombstones, position_in_partition::for_partition_start()}; + } + static apply_resume done() { return {stage::done, position_in_partition::for_partition_start()}; } diff --git a/test/boost/memtable_test.cc b/test/boost/memtable_test.cc index 3f5d9a93ff..8146e39684 100644 --- a/test/boost/memtable_test.cc +++ b/test/boost/memtable_test.cc @@ -28,6 +28,7 @@ #include "test/lib/eventually.hh" #include "test/lib/random_utils.hh" #include "test/lib/log.hh" +#include "test/lib/simple_schema.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/simple_schema.hh" #include "utils/error_injection.hh" @@ -571,6 +572,74 @@ SEASTAR_THREAD_TEST_CASE(test_tombstone_compaction_during_flush) { mt->cleaner().drain().get(); } +SEASTAR_THREAD_TEST_CASE(test_range_tombstones_are_compacted_with_data) { + tests::reader_concurrency_semaphore_wrapper semaphore; + simple_schema ss; + auto s = ss.schema(); + auto mt = make_lw_shared(ss.schema()); + + auto pk = ss.make_pkey(0); + auto pr = dht::partition_range::make_singular(pk); + + auto old_tombstone = ss.new_tombstone(); // older than any write, does not cover anything + + { + mutation m(ss.schema(), pk); + ss.add_row(m, ss.make_ckey(1), "v1"); + ss.add_row(m, ss.make_ckey(2), "v1"); + ss.add_row(m, ss.make_ckey(3), "v1"); + ss.add_row(m, ss.make_ckey(4), "v1"); + mt->apply(m); + } + + mutation rt_m(ss.schema(), pk); + auto rt = ss.delete_range(rt_m, ss.make_ckey_range(2,3)); + mt->apply(rt_m); + mt->cleaner().drain().get(); + + assert_that(mt->make_flat_reader(ss.schema(), semaphore.make_permit(), pr)) + .produces_partition_start(pk) + .produces_row_with_key(ss.make_ckey(1)) + .produces_range_tombstone_change({rt.position(), rt.tomb}) + .produces_range_tombstone_change({rt.end_position(), {}}) + .produces_row_with_key(ss.make_ckey(4)) + .produces_partition_end() + .produces_end_of_stream(); + + { + mutation m(ss.schema(), pk); + m.partition().apply(old_tombstone); + mt->apply(m); + mt->cleaner().drain().get(); + } + + // No change + assert_that(mt->make_flat_reader(ss.schema(), semaphore.make_permit(), pr)) + .produces_partition_start(pk, {old_tombstone}) + .produces_row_with_key(ss.make_ckey(1)) + .produces_range_tombstone_change({rt.position(), rt.tomb}) + .produces_range_tombstone_change({rt.end_position(), {}}) + .produces_row_with_key(ss.make_ckey(4)) + .produces_partition_end() + .produces_end_of_stream(); + + auto new_tomb = ss.new_tombstone(); + + { + mutation m(ss.schema(), pk); + m.partition().apply(new_tomb); + mt->apply(m); + mt->cleaner().drain().get(); + } + + assert_that(mt->make_flat_reader(ss.schema(), semaphore.make_permit(), pr)) + .produces_partition_start(pk, {new_tomb}) + .produces_range_tombstone_change({rt.position(), rt.tomb}) + .produces_range_tombstone_change({rt.end_position(), {}}) + .produces_partition_end() + .produces_end_of_stream(); +} + SEASTAR_TEST_CASE(test_hash_is_cached) { return seastar::async([] { auto s = schema_builder("ks", "cf")