diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 039aea62f1..b2a7d4c683 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -21,6 +21,7 @@ #pragma once +#include "compaction_garbage_collector.hh" #include "mutation_fragment.hh" static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) { @@ -60,6 +61,71 @@ struct detached_compaction_state { std::deque range_tombstones; }; +class noop_compacted_fragments_consumer { +public: + void consume_new_partition(const dht::decorated_key& dk) {} + void consume(tombstone t) {} + stop_iteration consume(static_row&& sr, tombstone, bool) { return stop_iteration::no; } + stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return stop_iteration::no; } + stop_iteration consume(range_tombstone&& rt) { return stop_iteration::no; } + stop_iteration consume_end_of_partition() { return stop_iteration::no; } + void consume_end_of_stream() {} +}; + +class mutation_compactor_garbage_collector : public compaction_garbage_collector { + const schema& _schema; + column_kind _kind; + std::optional _ckey; + row_tombstone _tomb; + row_marker _marker; + row _row; + +public: + explicit mutation_compactor_garbage_collector(const schema& schema) + : _schema(schema) { + } + void start_collecting_static_row() { + _kind = column_kind::static_column; + } + void start_collecting_clustering_row(clustering_key ckey) { + _kind = column_kind::regular_column; + _ckey = std::move(ckey); + } + void collect(row_tombstone tomb) { + _tomb = tomb; + } + virtual void collect(column_id id, atomic_cell cell) override { + _row.apply(_schema.column_at(_kind, id), std::move(cell)); + } + virtual void collect(column_id id, collection_type_impl::mutation mut) override { + if (mut.tomb || !mut.cells.empty()) { + const auto& cdef = _schema.column_at(_kind, id); + auto& ctype = *static_pointer_cast(cdef.type); + _row.apply(cdef, ctype.serialize_mutation_form(std::move(mut))); + } + } + virtual void collect(row_marker marker) override { + _marker = marker; + } + template + void consume_static_row(Consumer&& consumer) { + if (!_row.empty()) { + consumer(static_row(std::move(_row))); + _row = {}; + } + } + template + void consume_clustering_row(Consumer&& consumer) { + if (_tomb || !_marker.is_missing() || !_row.empty()) { + consumer(clustering_row(std::move(*_ckey), _tomb, _marker, std::move(_row))); + _ckey.reset(); + _tomb = {}; + _marker = {}; + _row = {}; + } + } +}; + // emit_only_live::yes will cause compact_for_query to emit only live // static and clustering rows. It doesn't affect the way range tombstones are // emitted. @@ -82,11 +148,14 @@ class compact_mutation_state { uint32_t _rows_in_current_partition; uint32_t _current_partition_limit; bool _empty_partition{}; + bool _empty_partition_in_gc_consumer{}; const dht::decorated_key* _dk{}; dht::decorated_key _last_dk; bool _has_ck_selector{}; std::optional _last_static_row; + + std::unique_ptr _collector; private: static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; @@ -95,6 +164,18 @@ private: return SSTableCompaction == compact_for_sstables::yes; } + template + void partition_is_not_empty_for_gc_consumer(GCConsumer& gc_consumer) { + if (_empty_partition_in_gc_consumer) { + _empty_partition_in_gc_consumer = false; + gc_consumer.consume_new_partition(*_dk); + auto pt = _range_tombstones.get_partition_tombstone(); + if (pt && can_purge_tombstone(pt)) { + gc_consumer.consume(pt); + } + } + } + template void partition_is_not_empty(Consumer& consumer) { if (_empty_partition) { @@ -162,6 +243,7 @@ public: , _slice(s.full_slice()) , _range_tombstones(s, false) , _last_dk({dht::token(), partition_key::make_empty()}) + , _collector(std::make_unique(_schema)) { static_assert(sstable_compaction(), "This constructor can only be used for sstable compaction."); static_assert(!only_live(), "SSTable compaction cannot be run with emit_only_live_rows::yes."); @@ -172,6 +254,7 @@ public: _dk = &dk; _has_ck_selector = has_ck_selector(_slice.row_ranges(_schema, pk)); _empty_partition = true; + _empty_partition_in_gc_consumer = true; _rows_in_current_partition = 0; _static_row_live = false; _range_tombstones.clear(); @@ -180,27 +263,40 @@ public: _last_static_row.reset(); } - template + template GCC6_CONCEPT( - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer ) - void consume(tombstone t, Consumer& consumer) { + void consume(tombstone t, Consumer& consumer, GCConsumer& gc_consumer) { _range_tombstones.set_partition_tombstone(t); - if (!only_live() && !can_purge_tombstone(t)) { - partition_is_not_empty(consumer); - } + if (!only_live()) { + if (can_purge_tombstone(t)) { + partition_is_not_empty_for_gc_consumer(gc_consumer); + } else { + partition_is_not_empty(consumer); + } + } } - template + template GCC6_CONCEPT( - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer ) - stop_iteration consume(static_row&& sr, Consumer& consumer) { + stop_iteration consume(static_row&& sr, Consumer& consumer, GCConsumer& gc_consumer) { _last_static_row = static_row(_schema, sr); auto current_tombstone = _range_tombstones.get_partition_tombstone(); - bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column, - row_tombstone(current_tombstone), - _query_time, _can_gc, _gc_before); + if constexpr (sstable_compaction()) { + _collector->start_collecting_static_row(); + } + bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column, row_tombstone(current_tombstone), + _query_time, _can_gc, _gc_before, _collector.get()); + if constexpr (sstable_compaction()) { + _collector->consume_static_row([this, &gc_consumer, current_tombstone] (static_row&& sr_garbage) { + partition_is_not_empty_for_gc_consumer(gc_consumer); + // We are passing only dead (purged) data so pass is_live=false. + gc_consumer.consume(std::move(sr_garbage), current_tombstone, false); + }); + } _static_row_live = is_live; if (is_live || (!only_live() && !sr.empty())) { partition_is_not_empty(consumer); @@ -209,19 +305,43 @@ public: return stop_iteration::no; } - template + template GCC6_CONCEPT( - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer ) - stop_iteration consume(clustering_row&& cr, Consumer& consumer) { + stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) { auto current_tombstone = _range_tombstones.tombstone_for_row(cr.key()); auto t = cr.tomb(); - if (t.tomb() <= current_tombstone || can_purge_tombstone(t)) { - cr.remove_tombstone(); - } t.apply(current_tombstone); - bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before); - is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before, cr.marker()); + + if constexpr (sstable_compaction()) { + _collector->start_collecting_clustering_row(cr.key()); + } + + { + const auto rt = cr.tomb(); + if (rt.tomb() <= current_tombstone) { + cr.remove_tombstone(); + } else if (can_purge_tombstone(rt)) { + if constexpr (sstable_compaction()) { + _collector->collect(rt); + } + cr.remove_tombstone(); + } + } + + bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before, _collector.get()); + is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before, cr.marker(), + _collector.get()); + + if constexpr (sstable_compaction()) { + _collector->consume_clustering_row([this, &gc_consumer, t] (clustering_row&& cr_garbage) { + partition_is_not_empty_for_gc_consumer(gc_consumer); + // We are passing only dead (purged) data so pass is_live=false. + gc_consumer.consume(std::move(cr_garbage), t, false); + }); + } + if (only_live() && is_live) { partition_is_not_empty(consumer); auto stop = consumer.consume(std::move(cr), t, true); @@ -243,25 +363,33 @@ public: return stop_iteration::no; } - template + template GCC6_CONCEPT( - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer ) - stop_iteration consume(range_tombstone&& rt, Consumer& consumer) { + stop_iteration consume(range_tombstone&& rt, Consumer& consumer, GCConsumer& gc_consumer) { _range_tombstones.apply(rt); // FIXME: drop tombstone if it is fully covered by other range tombstones - if (!can_purge_tombstone(rt.tomb) && rt.tomb > _range_tombstones.get_partition_tombstone()) { - partition_is_not_empty(consumer); - return consumer.consume(std::move(rt)); - } + if (rt.tomb > _range_tombstones.get_partition_tombstone()) { + if (can_purge_tombstone(rt.tomb)) { + partition_is_not_empty_for_gc_consumer(gc_consumer); + return gc_consumer.consume(std::move(rt)); + } else { + partition_is_not_empty(consumer); + return consumer.consume(std::move(rt)); + } + } return stop_iteration::no; } - template + template GCC6_CONCEPT( - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer ) - stop_iteration consume_end_of_partition(Consumer& consumer) { + stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) { + if (!_empty_partition_in_gc_consumer) { + gc_consumer.consume_end_of_partition(); + } if (!_empty_partition) { // #589 - Do not add extra row for statics unless we did a CK range-less query. // See comment in query @@ -280,16 +408,21 @@ public: return stop_iteration::no; } - template + template GCC6_CONCEPT( - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer ) - auto consume_end_of_stream(Consumer& consumer) { + auto consume_end_of_stream(Consumer& consumer, GCConsumer& gc_consumer) { if (_dk) { _last_dk = *_dk; _dk = &_last_dk; } - return consumer.consume_end_of_stream(); + if constexpr (std::is_same_v, void>) { + gc_consumer.consume_end_of_stream(); + return consumer.consume_end_of_stream(); + } else { + return std::pair(consumer.consume_end_of_stream(), gc_consumer.consume_end_of_stream()); + } } /// The decorated key of the partition the compaction is positioned in. @@ -322,7 +455,8 @@ public: if ((next_fragment_kind == mutation_fragment::kind::clustering_row || next_fragment_kind == mutation_fragment::kind::range_tombstone) && _last_static_row) { // Stopping here would cause an infinite loop so ignore return value. - consume(*std::exchange(_last_static_row, {}), consumer); + noop_compacted_fragments_consumer nc; + consume(*std::exchange(_last_static_row, {}), consumer, nc); } } @@ -343,30 +477,36 @@ public: } }; -template +template GCC6_CONCEPT( - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer ) class compact_mutation { lw_shared_ptr> _state; Consumer _consumer; + // Garbage Collected Consumer + GCConsumer _gc_consumer; public: compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit, - uint32_t partition_limit, Consumer consumer) + uint32_t partition_limit, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) - , _consumer(std::move(consumer)) { + , _consumer(std::move(consumer)) + , _gc_consumer(std::move(gc_consumer)) { } - compact_mutation(const schema& s, gc_clock::time_point compaction_time, Consumer consumer, - std::function get_max_purgeable) + compact_mutation(const schema& s, gc_clock::time_point compaction_time, + std::function get_max_purgeable, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) - , _consumer(std::move(consumer)) { + , _consumer(std::move(consumer)) + , _gc_consumer(std::move(gc_consumer)) { } - compact_mutation(lw_shared_ptr> state, Consumer consumer) + compact_mutation(lw_shared_ptr> state, Consumer consumer, + GCConsumer gc_consumer = GCConsumer()) : _state(std::move(state)) - , _consumer(std::move(consumer)) { + , _consumer(std::move(consumer)) + , _gc_consumer(std::move(gc_consumer)) { } void consume_new_partition(const dht::decorated_key& dk) { @@ -374,27 +514,27 @@ public: } void consume(tombstone t) { - _state->consume(std::move(t), _consumer); + _state->consume(std::move(t), _consumer, _gc_consumer); } stop_iteration consume(static_row&& sr) { - return _state->consume(std::move(sr), _consumer); + return _state->consume(std::move(sr), _consumer, _gc_consumer); } stop_iteration consume(clustering_row&& cr) { - return _state->consume(std::move(cr), _consumer); + return _state->consume(std::move(cr), _consumer, _gc_consumer); } stop_iteration consume(range_tombstone&& rt) { - return _state->consume(std::move(rt), _consumer); + return _state->consume(std::move(rt), _consumer, _gc_consumer); } stop_iteration consume_end_of_partition() { - return _state->consume_end_of_partition(_consumer); + return _state->consume_end_of_partition(_consumer, _gc_consumer); } auto consume_end_of_stream() { - return _state->consume_end_of_stream(_consumer); + return _state->consume_end_of_stream(_consumer, _gc_consumer); } }; @@ -402,8 +542,8 @@ template GCC6_CONCEPT( requires CompactedFragmentsConsumer ) -struct compact_for_query : compact_mutation { - using compact_mutation::compact_mutation; +struct compact_for_query : compact_mutation { + using compact_mutation::compact_mutation; }; template @@ -411,10 +551,10 @@ using compact_for_query_state = compact_mutation_state; using compact_for_data_query_state = compact_for_query_state; -template +template GCC6_CONCEPT( - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer ) -struct compact_for_compaction : compact_mutation { - using compact_mutation::compact_mutation; +struct compact_for_compaction : compact_mutation { + using compact_mutation::compact_mutation; }; diff --git a/sstables/compaction.cc b/sstables/compaction.cc index c2bce1ca84..7fc123c537 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -868,7 +868,7 @@ future compaction::run(std::unique_ptr c) { auto cr = c->get_compacting_sstable_writer(); auto cfc = make_stable_flattened_mutations_consumer>( - *c->schema(), gc_clock::now(), std::move(cr), c->max_purgeable_func()); + *c->schema(), gc_clock::now(), c->max_purgeable_func(), std::move(cr)); auto start_time = db_clock::now(); try {