diff --git a/compaction/compaction.cc b/compaction/compaction.cc index f10f75ae29..cd8e856006 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -273,10 +273,37 @@ struct compaction_writer { class compacting_sstable_writer { compaction& _c; std::optional _compaction_writer = {}; + using creator_func_t = std::function; + using stop_func_t = std::function; + creator_func_t _create_compaction_writer; + stop_func_t _stop_compaction_writer; + std::optional> _stop_request_observer; + bool _unclosed_partition = false; private: inline void maybe_abort_compaction(); + + utils::observer<> make_stop_request_observer(utils::observable<>& sro) { + return sro.observe([this] () mutable { + assert(!_unclosed_partition); + consume_end_of_stream(); + }); + } public: - explicit compacting_sstable_writer(compaction& c) : _c(c) { } + explicit compacting_sstable_writer(compaction& c, creator_func_t cpw, stop_func_t scw) + : _c(c) + , _create_compaction_writer(std::move(cpw)) + , _stop_compaction_writer(std::move(scw)) { + } + explicit compacting_sstable_writer(compaction& c, creator_func_t cpw, stop_func_t scw, utils::observable<>& sro) + : _c(c) + , _create_compaction_writer(std::move(cpw)) + , _stop_compaction_writer(std::move(scw)) + , _stop_request_observer(make_stop_request_observer(sro)) { + } + compacting_sstable_writer(compacting_sstable_writer&& other); + compacting_sstable_writer& operator=(const compacting_sstable_writer&) = delete; + compacting_sstable_writer(const compacting_sstable_writer&) = delete; + void consume_new_partition(const dht::decorated_key& dk); void consume(tombstone t) { _compaction_writer->writer.consume(t); } @@ -365,89 +392,6 @@ private: std::unordered_map _generated_monitors; }; -// Writes a temporary sstable run containing only garbage collected data. -// Whenever regular compaction writer seals a new sstable, this writer will flush a new sstable as well, -// right before there's an attempt to release exhausted sstables earlier. -// Generated sstables will be temporarily added to table to make sure that a compaction crash will not -// result in data resurrection. -// When compaction finishes, all the temporary sstables generated here will be deleted and removed -// from table's sstable set. -class garbage_collected_sstable_writer { -public: - // Data for GC writer is stored separately to allow compaction class to communicate directly - // with garbage_collected_sstable_writer which is moved into mutation_compaction, making it - // unreachable after the compaction process has started. - class data { - compaction* _c = nullptr; - // Garbage collected sstables that are sealed but were not added to SSTable set yet. - std::vector _unused_garbage_collected_sstables; - // Garbage collected sstables that were added to SSTable set and should be eventually removed from it. - std::vector _used_garbage_collected_sstables; - std::optional _compaction_writer; - utils::UUID _run_identifier = utils::make_random_uuid(); - sstable_writer& writer() { - return _compaction_writer->writer; - } - - public: - explicit data(compaction& c) : _c(&c) { - } - - data& operator=(const data&) = delete; - data(const data&) = delete; - - void maybe_create_new_sstable_writer(); - void finish_sstable_writer(); - - // Retrieves all unused garbage collected sstables that will be subsequently added - // to the SSTable set, and mark them as used. - std::vector consume_unused_garbage_collected_sstables() { - auto unused = std::exchange(_unused_garbage_collected_sstables, {}); - _used_garbage_collected_sstables.insert(_used_garbage_collected_sstables.end(), unused.begin(), unused.end()); - return unused; - } - - const std::vector& used_garbage_collected_sstables() const { - return _used_garbage_collected_sstables; - } - - friend class garbage_collected_sstable_writer; - }; -private: - garbage_collected_sstable_writer::data* _data = nullptr; - sstable_writer& writer() { - return _data->writer(); - } -public: - explicit garbage_collected_sstable_writer() = default; - explicit garbage_collected_sstable_writer(garbage_collected_sstable_writer::data& data) : _data(&data) {} - - garbage_collected_sstable_writer& operator=(const garbage_collected_sstable_writer&) = delete; - garbage_collected_sstable_writer(const garbage_collected_sstable_writer&) = delete; - - garbage_collected_sstable_writer(garbage_collected_sstable_writer&& other) = default; - garbage_collected_sstable_writer& operator=(garbage_collected_sstable_writer&& other) = default; - - void consume_new_partition(const dht::decorated_key& dk) { - _data->maybe_create_new_sstable_writer(); - writer().consume_new_partition(dk); - } - - void consume(tombstone t) { writer().consume(t); } - stop_iteration consume(static_row&& sr, tombstone, bool) { return writer().consume(std::move(sr)); } - stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return writer().consume(std::move(cr)); } - stop_iteration consume(range_tombstone&& rt) { return writer().consume(std::move(rt)); } - - stop_iteration consume_end_of_partition() { - writer().consume_end_of_partition(); - return stop_iteration::no; - } - - void consume_end_of_stream() { - _data->finish_sstable_writer(); - } -}; - class formatted_sstables_list { bool _include_origin = true; std::vector _ssts; @@ -503,7 +447,6 @@ protected: encoding_stats_collector _stats_collector; bool _contains_multi_fragment_runs = false; mutation_source_metadata _ms_metadata = {}; - garbage_collected_sstable_writer::data _gc_sstable_writer_data; compaction_sstable_replacer_fn _replacer; utils::UUID _run_identifier; ::io_priority_class _io_priority; @@ -512,6 +455,11 @@ protected: // used to incrementally calculate max purgeable timestamp, as we iterate through decorated keys. std::optional _selector; std::unordered_set _compacting_for_max_purgeable_func; + // Garbage collected sstables that are sealed but were not added to SSTable set yet. + std::vector _unused_garbage_collected_sstables; + // Garbage collected sstables that were added to SSTable set and should be eventually removed from it. + std::vector _used_garbage_collected_sstables; + utils::observable<> _stop_request_observable; private: compaction_data& init_compaction_data(compaction_data& cdata, const compaction_descriptor& descriptor) const { cdata.compaction_fan_in = descriptor.fan_in(); @@ -529,7 +477,6 @@ protected: , _type(descriptor.options.type()) , _max_sstable_size(descriptor.max_sstable_bytes) , _sstable_level(descriptor.level) - , _gc_sstable_writer_data(*this) , _replacer(std::move(descriptor.replacer)) , _run_identifier(descriptor.run_identifier) , _io_priority(descriptor.io_priority) @@ -605,6 +552,51 @@ protected: bool tombstone_expiration_enabled() const { return bool(_sstable_set); } + + compaction_writer create_gc_compaction_writer() const { + auto sst = _sstable_creator(this_shard_id()); + + auto&& priority = _io_priority; + auto monitor = std::make_unique(sst, _cf, maximum_timestamp(), _sstable_level); + sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer("garbage_collection"); + cfg.run_identifier = _run_identifier; + cfg.monitor = monitor.get(); + auto writer = sst->get_writer(*schema(), partitions_per_sstable(), cfg, get_encoding_stats(), priority); + return compaction_writer(std::move(monitor), std::move(writer), std::move(sst)); + } + + void stop_gc_compaction_writer(compaction_writer* c_writer) { + c_writer->writer.consume_end_of_stream(); + auto sst = c_writer->sst; + sst->open_data().get0(); + _unused_garbage_collected_sstables.push_back(std::move(sst)); + } + + // Writes a temporary sstable run containing only garbage collected data. + // Whenever regular compaction writer seals a new sstable, this writer will flush a new sstable as well, + // right before there's an attempt to release exhausted sstables earlier. + // Generated sstables will be temporarily added to table to make sure that a compaction crash will not + // result in data resurrection. + // When compaction finishes, all the temporary sstables generated here will be deleted and removed + // from table's sstable set. + compacting_sstable_writer get_gc_compacting_sstable_writer() { + return compacting_sstable_writer(*this, + [this] (const dht::decorated_key&) { return create_gc_compaction_writer(); }, + [this] (compaction_writer* cw) { stop_gc_compaction_writer(cw); }, + _stop_request_observable); + } + + // Retrieves all unused garbage collected sstables that will be subsequently added + // to the SSTable set, and mark them as used. + std::vector consume_unused_garbage_collected_sstables() { + auto unused = std::exchange(_unused_garbage_collected_sstables, {}); + _used_garbage_collected_sstables.insert(_used_garbage_collected_sstables.end(), unused.begin(), unused.end()); + return unused; + } + + const std::vector& used_garbage_collected_sstables() const { + return _used_garbage_collected_sstables; + } public: compaction& operator=(const compaction&) = delete; compaction(const compaction&) = delete; @@ -622,9 +614,7 @@ private: return _cf.get_compaction_strategy().make_sstable_set(_schema); } - template - requires CompactedFragmentsConsumer - future<> setup(GCConsumer gc_consumer) { + void setup() { auto ssts = make_lw_shared(make_sstable_set_for_input()); formatted_sstables_list formatted_msg; auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - _schema->gc_grace_seconds()); @@ -673,17 +663,30 @@ private: _ms_metadata.min_timestamp = timestamp_tracker.min(); _ms_metadata.max_timestamp = timestamp_tracker.max(); + } + + future<> consume() { auto now = gc_clock::now(); - auto consumer = make_interposer_consumer([this, gc_consumer = std::move(gc_consumer), now] (flat_mutation_reader reader) mutable + auto consumer = make_interposer_consumer([this, now] (flat_mutation_reader reader) mutable { - return seastar::async([this, reader = std::move(reader), gc_consumer = std::move(gc_consumer), now] () mutable { + return seastar::async([this, reader = std::move(reader), now] () mutable { auto close_reader = deferred_close(reader); - using compact_mutations = compact_for_compaction; + if (enable_garbage_collected_sstable_writer()) { + using compact_mutations = compact_for_compaction; + auto cfc = make_stable_flattened_mutations_consumer(*schema(), now, + max_purgeable_func(), + get_compacting_sstable_writer(), + get_gc_compacting_sstable_writer()); + + reader.consume_in_thread(std::move(cfc)); + return; + } + using compact_mutations = compact_for_compaction; auto cfc = make_stable_flattened_mutations_consumer(*schema(), now, - max_purgeable_func(), - get_compacting_sstable_writer(), - std::move(gc_consumer)); + max_purgeable_func(), + get_compacting_sstable_writer(), + noop_compacted_fragments_consumer()); reader.consume_in_thread(std::move(cfc)); }); @@ -755,7 +758,9 @@ private: virtual void stop_sstable_writer(compaction_writer* writer) = 0; compacting_sstable_writer get_compacting_sstable_writer() { - return compacting_sstable_writer(*this); + return compacting_sstable_writer(*this, + [this] (const dht::decorated_key& dk) { return create_compaction_writer(dk); }, + [this] (compaction_writer* cw) { stop_sstable_writer(cw); }); } const schema_ptr& schema() const { @@ -805,25 +810,27 @@ protected: log(log_level::trace, std::move(fmt), std::forward(args)...); } public: - garbage_collected_sstable_writer make_garbage_collected_sstable_writer() { - return garbage_collected_sstable_writer(_gc_sstable_writer_data); - } - bool enable_garbage_collected_sstable_writer() const { // FIXME: Disable GC writer if interposer consumer is enabled until they both can work simultaneously. // More details can be found at https://github.com/scylladb/scylla/issues/6472 return _contains_multi_fragment_runs && !use_interposer_consumer(); } - template - requires CompactedFragmentsConsumer - static future run(std::unique_ptr c, GCConsumer gc_consumer = GCConsumer()); + static future run(std::unique_ptr c); friend class compacting_sstable_writer; - friend class garbage_collected_sstable_writer; - friend class garbage_collected_sstable_writer::data; }; +compacting_sstable_writer::compacting_sstable_writer(compacting_sstable_writer&& other) + : _c(other._c) + , _compaction_writer(std::move(other._compaction_writer)) + , _create_compaction_writer(std::move(other._create_compaction_writer)) + , _stop_compaction_writer(std::move(other._stop_compaction_writer)) { + if (std::exchange(other._stop_request_observer, std::nullopt)) { + _stop_request_observer = make_stop_request_observer(_c._stop_request_observable); + } +} + void compacting_sstable_writer::maybe_abort_compaction() { if (_c._cdata.is_stop_requested()) [[unlikely]] { // Compaction manager will catch this exception and re-schedule the compaction. @@ -834,19 +841,21 @@ void compacting_sstable_writer::maybe_abort_compaction() { void compacting_sstable_writer::consume_new_partition(const dht::decorated_key& dk) { maybe_abort_compaction(); if (!_compaction_writer) { - _compaction_writer = _c.create_compaction_writer(dk); + _compaction_writer = _create_compaction_writer(dk); } _c.on_new_partition(); _compaction_writer->writer.consume_new_partition(dk); _c._cdata.total_keys_written++; + _unclosed_partition = true; } stop_iteration compacting_sstable_writer::consume_end_of_partition() { auto ret = _compaction_writer->writer.consume_end_of_partition(); + _unclosed_partition = false; if (ret == stop_iteration::yes) { // stop sstable writer being currently used. - _c.stop_sstable_writer(&*_compaction_writer); + _stop_compaction_writer(&*_compaction_writer); _compaction_writer = std::nullopt; } return ret; @@ -854,35 +863,11 @@ stop_iteration compacting_sstable_writer::consume_end_of_partition() { void compacting_sstable_writer::consume_end_of_stream() { if (_compaction_writer) { - _c.stop_sstable_writer(&*_compaction_writer); + _stop_compaction_writer(&*_compaction_writer); _compaction_writer = std::nullopt; } } -void garbage_collected_sstable_writer::data::maybe_create_new_sstable_writer() { - if (!_compaction_writer) { - auto sst = _c->_sstable_creator(this_shard_id()); - - auto&& priority = _c->_io_priority; - auto monitor = std::make_unique(sst, _c->_cf, _c->maximum_timestamp(), _c->_sstable_level); - sstable_writer_config cfg = _c->_cf.get_sstables_manager().configure_writer("garbage_collection"); - cfg.run_identifier = _run_identifier; - cfg.monitor = monitor.get(); - auto writer = sst->get_writer(*_c->schema(), _c->partitions_per_sstable(), cfg, _c->get_encoding_stats(), priority); - _compaction_writer.emplace(std::move(monitor), std::move(writer), std::move(sst)); - } -} - -void garbage_collected_sstable_writer::data::finish_sstable_writer() { - if (_compaction_writer) { - writer().consume_end_of_stream(); - auto sst = std::move(_compaction_writer->sst); - sst->open_data().get0(); - _compaction_writer = std::nullopt; - _unused_garbage_collected_sstables.push_back(std::move(sst)); - } -} - class reshape_compaction : public compaction { public: reshape_compaction(column_family& cf, compaction_descriptor descriptor, compaction_data& cdata) @@ -1010,15 +995,17 @@ private: _compacting->erase(sst); }); // Make sure SSTable created by garbage collected writer is made available - // before exhausted SSTable is released, so as to prevent data resurrection. - _gc_sstable_writer_data.finish_sstable_writer(); + // before exhausted SSTable is released, so to prevent data resurrection. + _stop_request_observable(); + // Added Garbage collected SSTables to list of unused SSTables that will be added // to SSTable set. GC SSTables should be added before compaction completes because // a failure could result in data resurrection if data is not made available. - auto unused_gc_sstables = _gc_sstable_writer_data.consume_unused_garbage_collected_sstables(); + auto unused_gc_sstables = consume_unused_garbage_collected_sstables(); _new_unused_sstables.insert(_new_unused_sstables.end(), unused_gc_sstables.begin(), unused_gc_sstables.end()); auto exhausted_ssts = std::vector(exhausted, _sstables.end()); + log_debug("Replacing earlier exhausted sstable(s) {} by new sstable {}", formatted_sstables_list(exhausted_ssts, false), sst->get_filename()); _replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables))); _sstables.erase(exhausted, _sstables.end()); _monitor_generator.remove_exhausted_sstables(exhausted_ssts); @@ -1026,13 +1013,13 @@ private: } void replace_remaining_exhausted_sstables() { - if (!_sstables.empty() || !_gc_sstable_writer_data.used_garbage_collected_sstables().empty()) { + if (!_sstables.empty() || !used_garbage_collected_sstables().empty()) { std::vector old_sstables; std::move(_sstables.begin(), _sstables.end(), std::back_inserter(old_sstables)); // Remove Garbage Collected SSTables from the SSTable set if any was previously added. - auto& used_garbage_collected_sstables = _gc_sstable_writer_data.used_garbage_collected_sstables(); - old_sstables.insert(old_sstables.end(), used_garbage_collected_sstables.begin(), used_garbage_collected_sstables.end()); + auto& used_gc_sstables = used_garbage_collected_sstables(); + old_sstables.insert(old_sstables.end(), used_gc_sstables.begin(), used_gc_sstables.end()); _replacer(get_compaction_completion_desc(std::move(old_sstables), std::move(_new_unused_sstables))); } @@ -1565,11 +1552,11 @@ public: } }; -template -requires CompactedFragmentsConsumer -future compaction::run(std::unique_ptr c, GCConsumer gc_consumer) { - return seastar::async([c = std::move(c), gc_consumer = std::move(gc_consumer)] () mutable { - auto consumer = c->setup(std::move(gc_consumer)); +future compaction::run(std::unique_ptr c) { + return seastar::async([c = std::move(c)] () mutable { + c->setup(); + auto consumer = c->consume(); + auto start_time = db_clock::now(); try { consumer.get(); @@ -1716,12 +1703,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, compaction_data& cd // Bypass the usual compaction machinery for dry-mode scrub return scrub_sstables_validate_mode(std::move(descriptor), cdata, cf); } - auto c = make_compaction(cf, std::move(descriptor), cdata); - if (c->enable_garbage_collected_sstable_writer()) { - auto gc_writer = c->make_garbage_collected_sstable_writer(); - return compaction::run(std::move(c), std::move(gc_writer)); - } - return compaction::run(std::move(c)); + return compaction::run(make_compaction(cf, std::move(descriptor), cdata)); } std::unordered_set