diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 9160d74811..1756bf12cf 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -30,6 +30,11 @@ enum class compact_for_sstables { yes, }; +enum class compactor_output_format { + v1, + v2 +}; + template concept CompactedFragmentsConsumer = requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, clustering_row cr, range_tombstone rt, tombstone current_tombstone, row_tombstone current_row_tombstone, bool is_alive) { @@ -42,6 +47,24 @@ concept CompactedFragmentsConsumer = requires(T obj, tombstone t, const dht::dec obj.consume_end_of_stream(); }; +template +concept CompactedFragmentsConsumerV2 = requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, + clustering_row cr, range_tombstone_change rtc, tombstone current_tombstone, row_tombstone current_row_tombstone, bool is_alive) { + obj.consume_new_partition(dk); + obj.consume(t); + { obj.consume(std::move(sr), current_tombstone, is_alive) } -> std::same_as; + { obj.consume(std::move(cr), current_row_tombstone, is_alive) } -> std::same_as; + { obj.consume(std::move(rtc)) } -> std::same_as; + { obj.consume_end_of_partition() } -> std::same_as; + obj.consume_end_of_stream(); +}; + +// TODO: I want to make this choose the right concept for OutputFormat but +// probably not worth the effort for the (hopefully) brief time for which we +// have to support both. +template +concept CompactedFragmentsConsumerWithVersion = CompactedFragmentsConsumer || CompactedFragmentsConsumerV2; + struct detached_compaction_state { ::partition_start partition_start; std::optional<::static_row> static_row; @@ -55,6 +78,7 @@ public: stop_iteration consume(static_row&& sr, tombstone, bool) { return stop_iteration::no; } stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return stop_iteration::no; } stop_iteration consume(range_tombstone&& rt) { return stop_iteration::no; } + stop_iteration consume(range_tombstone_change&& rtc) { return stop_iteration::no; } stop_iteration consume_end_of_partition() { return stop_iteration::no; } void consume_end_of_stream() {} }; @@ -135,7 +159,7 @@ struct compaction_stats { // emit_only_live::yes will cause compact_for_query to emit only live // static and clustering rows. It doesn't affect the way range tombstones are // emitted. -template +template class compact_mutation_state { const schema& _schema; gc_clock::time_point _query_time; @@ -162,14 +186,21 @@ class compact_mutation_state { std::optional _last_static_row; position_in_partition _last_clustering_pos; - tombstone _current_tombstone; + // Currently active tombstone, can be different than the tombstone emitted to + // the regular consumer (_current_emitted_tombstone) because even purged + // tombstone that are not emitted are still applied to data when compacting. + tombstone _effective_tombstone; + // Track last emitted tombstone to regular and gc consumers respectively. + // Used to determine whether any active tombstones need closing at EOS. + tombstone _current_emitted_tombstone; + tombstone _current_emitted_gc_tombstone; std::unique_ptr _collector; compaction_stats _stats; private: template - requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion stop_iteration do_consume(range_tombstone&& rt, Consumer& consumer, GCConsumer& gc_consumer) { if (rt.tomb <= _partition_tombstone) { return stop_iteration::no; @@ -183,13 +214,44 @@ private: } } template - tombstone tombstone_for_row(const clustering_key& ckey, Consumer& consumer, GCConsumer& gc_consumer) { - if (_rt_assembler.needs_flush()) { - if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(ckey))) { - do_consume(std::move(*rt_opt), consumer, gc_consumer); + requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { + stop_iteration gc_consumer_stop = stop_iteration::no; + stop_iteration consumer_stop = stop_iteration::no; + if (rtc.tombstone() <= _partition_tombstone) { + rtc.set_tombstone({}); + } + _effective_tombstone = rtc.tombstone(); + const auto can_purge = rtc.tombstone() && can_purge_tombstone(rtc.tombstone()); + if (can_purge || _current_emitted_gc_tombstone) { + partition_is_not_empty_for_gc_consumer(gc_consumer); + auto tomb = can_purge ? rtc.tombstone() : tombstone{}; + _current_emitted_gc_tombstone = tomb; + gc_consumer_stop = gc_consumer.consume(range_tombstone_change(rtc.position(), tomb)); + if (can_purge) { + rtc.set_tombstone({}); } } - return std::max(_partition_tombstone, _rt_assembler.get_current_tombstone()); + // If we have a previous active tombstone we emit the current one even if it is purged. + if (!can_purge || _current_emitted_tombstone) { + partition_is_not_empty(consumer); + _current_emitted_tombstone = rtc.tombstone(); + consumer_stop = consumer.consume(std::move(rtc)); + } + return gc_consumer_stop || consumer_stop; + } + template + tombstone tombstone_for_row(const clustering_key& ckey, Consumer& consumer, GCConsumer& gc_consumer) { + if constexpr (OutputFormat == compactor_output_format::v2) { + return std::max(_partition_tombstone, _effective_tombstone); + } else { + if (_rt_assembler.needs_flush()) { + if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(ckey))) { + do_consume(std::move(*rt_opt), consumer, gc_consumer); + } + } + return std::max(_partition_tombstone, _rt_assembler.get_current_tombstone()); + } } static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; @@ -312,11 +374,13 @@ public: _gc_before = std::nullopt; _last_static_row.reset(); _last_clustering_pos = position_in_partition::before_all_clustered_rows(); - _current_tombstone = {}; + _effective_tombstone = {}; + _current_emitted_tombstone = {}; + _current_emitted_gc_tombstone = {}; } template - requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion void consume(tombstone t, Consumer& consumer, GCConsumer& gc_consumer) { _partition_tombstone = t; if (!only_live()) { @@ -329,13 +393,13 @@ public: } template - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion void force_partition_not_empty(Consumer& consumer) { partition_is_not_empty(consumer); } template - requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion stop_iteration consume(static_row&& sr, Consumer& consumer, GCConsumer& gc_consumer) { _last_static_row = static_row(_schema, sr); auto current_tombstone = _partition_tombstone; @@ -366,7 +430,7 @@ public: } template - requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) { if (!sstable_compaction()) { _last_clustering_pos = cr.position(); @@ -433,27 +497,40 @@ public: } template - requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion stop_iteration consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { if (!sstable_compaction()) { _last_clustering_pos = rtc.position(); } ++_stats.range_tombstones; - if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { - return do_consume(std::move(*rt_opt), consumer, gc_consumer); + if constexpr (OutputFormat == compactor_output_format::v1) { + _effective_tombstone = rtc.tombstone(); + if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { + return do_consume(std::move(*rt_opt), consumer, gc_consumer); + } + } else { + do_consume(std::move(rtc), consumer, gc_consumer); } return stop_iteration::no; } template - requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) { - if (_current_tombstone = _rt_assembler.get_current_tombstone(); _current_tombstone) { - if (auto rt_opt = _rt_assembler.consume(_schema, range_tombstone_change(position_in_partition::after_key(_last_clustering_pos), tombstone{}))) { - do_consume(std::move(*rt_opt), consumer, gc_consumer); + if (_effective_tombstone) { + auto rtc = range_tombstone_change(position_in_partition::after_key(_last_clustering_pos), tombstone{}); + if constexpr (OutputFormat == compactor_output_format::v1) { + if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { + do_consume(std::move(*rt_opt), consumer, gc_consumer); + } + _rt_assembler.on_end_of_stream(); + } else { + // do_consume() overwrites _effective_tombstone with {}, so save and restore it. + auto prev_tombstone = _effective_tombstone; + do_consume(std::move(rtc), consumer, gc_consumer); + _effective_tombstone = prev_tombstone; } } - _rt_assembler.on_end_of_stream(); if (!_empty_partition_in_gc_consumer) { gc_consumer.consume_end_of_partition(); } @@ -477,7 +554,7 @@ public: } template - requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion auto consume_end_of_stream(Consumer& consumer, GCConsumer& gc_consumer) { if (_dk) { _last_dk = *_dk; @@ -501,7 +578,7 @@ public: /// partition-header and static row if there are clustering rows or range /// tombstones left in the partition. template - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumerWithVersion void start_new_page(uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, @@ -522,8 +599,11 @@ public: // Stopping here would cause an infinite loop so ignore return value. consume(*std::exchange(_last_static_row, {}), consumer, nc); } - if (_current_tombstone) { - if (auto rt_opt = _rt_assembler.consume(_schema, range_tombstone_change(position_in_partition_view::after_key(_last_clustering_pos), std::exchange(_current_tombstone, {})))) { + if (_effective_tombstone) { + auto rtc = range_tombstone_change(position_in_partition_view::after_key(_last_clustering_pos), _effective_tombstone); + if constexpr (OutputFormat == compactor_output_format::v2) { + do_consume(std::move(rtc), consumer, nc); + } else if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { do_consume(std::move(*rt_opt), consumer, nc); } } @@ -542,8 +622,8 @@ public: /// allows the compaction state to be stored in the compacted reader. detached_compaction_state detach_state() && { partition_start ps(std::move(_last_dk), _partition_tombstone); - if (_current_tombstone) { - return {std::move(ps), std::move(_last_static_row), range_tombstone_change(position_in_partition_view::after_key(_last_clustering_pos), _current_tombstone)}; + if (_effective_tombstone) { + return {std::move(ps), std::move(_last_static_row), range_tombstone_change(position_in_partition_view::after_key(_last_clustering_pos), _effective_tombstone)}; } else { return {std::move(ps), std::move(_last_static_row), std::optional{}}; } @@ -612,6 +692,66 @@ public: } }; +template +requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 +class compact_mutation_v2 { + lw_shared_ptr> _state; + Consumer _consumer; + // Garbage Collected Consumer + GCConsumer _gc_consumer; + +public: + compact_mutation_v2(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit, + uint32_t partition_limit, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) + : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) + , _consumer(std::move(consumer)) + , _gc_consumer(std::move(gc_consumer)) { + } + + compact_mutation_v2(const schema& s, gc_clock::time_point compaction_time, + std::function get_max_purgeable, + Consumer consumer, GCConsumer gc_consumer = GCConsumer()) + : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) + , _consumer(std::move(consumer)) + , _gc_consumer(std::move(gc_consumer)) { + } + + compact_mutation_v2(lw_shared_ptr> state, Consumer consumer, + GCConsumer gc_consumer = GCConsumer()) + : _state(std::move(state)) + , _consumer(std::move(consumer)) + , _gc_consumer(std::move(gc_consumer)) { + } + + void consume_new_partition(const dht::decorated_key& dk) { + _state->consume_new_partition(dk); + } + + void consume(tombstone t) { + _state->consume(std::move(t), _consumer, _gc_consumer); + } + + stop_iteration consume(static_row&& sr) { + return _state->consume(std::move(sr), _consumer, _gc_consumer); + } + + stop_iteration consume(clustering_row&& cr) { + return _state->consume(std::move(cr), _consumer, _gc_consumer); + } + + stop_iteration consume(range_tombstone_change&& rtc) { + return _state->consume(std::move(rtc), _consumer, _gc_consumer); + } + + stop_iteration consume_end_of_partition() { + return _state->consume_end_of_partition(_consumer, _gc_consumer); + } + + auto consume_end_of_stream() { + return _state->consume_end_of_stream(_consumer, _gc_consumer); + } +}; + template requires CompactedFragmentsConsumer struct compact_for_query : compact_mutation { @@ -626,3 +766,18 @@ requires CompactedFragmentsConsumer && CompactedFragmentsConsumer { using compact_mutation::compact_mutation; }; + +template +requires CompactedFragmentsConsumerV2 +struct compact_for_query_v2 : compact_mutation_v2 { + using compact_mutation_v2::compact_mutation_v2; +}; + +template +using compact_for_query_state_v2 = compact_mutation_state; + +template +requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 +struct compact_for_compaction_v2 : compact_mutation_v2 { + using compact_mutation_v2::compact_mutation_v2; +};