mutation_partition: Extract deletable_row::compact_and_expire()

This commit is contained in:
Tomasz Grabiec
2021-12-14 02:42:26 +01:00
parent 0e3c4fc641
commit 080c403d0b
2 changed files with 38 additions and 11 deletions

View File

@@ -1317,17 +1317,8 @@ uint32_t mutation_partition::do_compact(const schema& s,
return stop_iteration::no;
}
deletable_row& row = e.row();
const auto higher_tomb = std::max(_tombstone, range_tombstone_for_row(s, e.key()));
row_tombstone tomb = row.deleted_at();
tomb.apply(higher_tomb);
bool is_live = row.marker().compact_and_expire(tomb.tomb(), query_time, can_gc, gc_before);
is_live |= row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before, row.marker());
if (should_purge_row_tombstone(row.deleted_at()) || row.deleted_at().tomb() <= higher_tomb) {
row.remove_tombstone();
}
tombstone tomb = range_tombstone_for_row(s, e.key());
bool is_live = row.compact_and_expire(s, tomb, query_time, can_gc, gc_before, nullptr);
return stop_iteration(is_live && ++row_count == row_limit);
};
@@ -1673,6 +1664,29 @@ std::ostream& operator<<(std::ostream& os, const lazy_row::printer& p) {
return os << row::printer(p._schema, p._kind, p._row.get());
}
bool deletable_row::compact_and_expire(const schema& s,
tombstone tomb,
gc_clock::time_point query_time,
can_gc_fn& can_gc,
gc_clock::time_point gc_before,
compaction_garbage_collector* collector)
{
auto should_purge_row_tombstone = [&] (const row_tombstone& t) {
return t.max_deletion_time() < gc_before && can_gc(t.tomb());
};
apply(tomb);
bool is_live = marker().compact_and_expire(deleted_at().tomb(), query_time, can_gc, gc_before);
is_live |= cells().compact_and_expire(s, column_kind::regular_column, deleted_at(), query_time, can_gc, gc_before, marker(), collector);
if (deleted_at().tomb() <= tomb || should_purge_row_tombstone(deleted_at())) {
remove_tombstone();
}
return is_live;
}
deletable_row deletable_row::difference(const schema& s, column_kind kind, const deletable_row& other) const
{
deletable_row dr;

View File

@@ -866,6 +866,19 @@ public:
bool empty() const { return !_deleted_at && _marker.is_missing() && !_cells.size(); }
deletable_row difference(const schema&, column_kind, const deletable_row& other) const;
// Expires cells and tombstones. Removes items covered by higher level
// tombstones.
// Returns true iff the row is still alive.
// When empty() after the call, the row can be removed without losing writes
// given that tomb will be still in effect for the row after it is removed,
// as a range tombstone, partition tombstone, etc.
bool compact_and_expire(const schema&,
tombstone tomb,
gc_clock::time_point query_time,
can_gc_fn& can_gc,
gc_clock::time_point gc_before,
compaction_garbage_collector* collector = nullptr);
class printer {
const schema& _schema;
const deletable_row& _deletable_row;