diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 7a6387391e..f4de750745 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -163,6 +163,64 @@ static std::vector get_uncompacting_sstables(column_family& cf, class compaction; +class compaction_write_monitor final : public sstables::write_monitor, public backlog_write_progress_manager { + sstables::shared_sstable _sst; + column_family& _cf; + const sstables::writer_offset_tracker* _tracker = nullptr; + uint64_t _progress_seen = 0; + api::timestamp_type _maximum_timestamp; + unsigned _sstable_level; +public: + compaction_write_monitor(sstables::shared_sstable sst, column_family& cf, api::timestamp_type max_timestamp, unsigned sstable_level) + : _sst(sst) + , _cf(cf) + , _maximum_timestamp(max_timestamp) + , _sstable_level(sstable_level) + {} + + ~compaction_write_monitor() { + if (_sst) { + _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst); + } + } + + virtual void on_write_started(const sstables::writer_offset_tracker& tracker) override { + _tracker = &tracker; + _cf.get_compaction_strategy().get_backlog_tracker().register_partially_written_sstable(_sst, *this); + } + + virtual void on_data_write_completed() override { + if (_tracker) { + _progress_seen = _tracker->offset; + _tracker = nullptr; + } + } + + virtual void write_failed() override { + _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(std::move(_sst)); + } + + virtual uint64_t written() const { + if (_tracker) { + return _tracker->offset; + } + return _progress_seen; + } + + void add_sstable() { + _cf.get_compaction_strategy().get_backlog_tracker().add_sstable(_sst); + _sst = {}; + } + + api::timestamp_type maximum_timestamp() const override { + return _maximum_timestamp; + } + + unsigned level() const override { + return _sstable_level; + } +}; + struct compaction_writer { sstable_writer writer; shared_sstable sst; @@ -273,64 +331,6 @@ private: std::deque _generated_monitors; }; -class compaction_write_monitor final : public sstables::write_monitor, public backlog_write_progress_manager { - sstables::shared_sstable _sst; - column_family& _cf; - const sstables::writer_offset_tracker* _tracker = nullptr; - uint64_t _progress_seen = 0; - api::timestamp_type _maximum_timestamp; - unsigned _sstable_level; -public: - compaction_write_monitor(sstables::shared_sstable sst, column_family& cf, api::timestamp_type max_timestamp, unsigned sstable_level) - : _sst(sst) - , _cf(cf) - , _maximum_timestamp(max_timestamp) - , _sstable_level(sstable_level) - {} - - ~compaction_write_monitor() { - if (_sst) { - _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst); - } - } - - virtual void on_write_started(const sstables::writer_offset_tracker& tracker) override { - _tracker = &tracker; - _cf.get_compaction_strategy().get_backlog_tracker().register_partially_written_sstable(_sst, *this); - } - - virtual void on_data_write_completed() override { - if (_tracker) { - _progress_seen = _tracker->offset; - _tracker = nullptr; - } - } - - virtual void write_failed() override { - _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(std::move(_sst)); - } - - virtual uint64_t written() const { - if (_tracker) { - return _tracker->offset; - } - return _progress_seen; - } - - void add_sstable() { - _cf.get_compaction_strategy().get_backlog_tracker().add_sstable(_sst); - _sst = {}; - } - - api::timestamp_type maximum_timestamp() const override { - return _maximum_timestamp; - } - - unsigned level() const override { - return _sstable_level; - } -}; - // Writes a temporary sstable run containing only garbage collected data. // Whenever regular compaction writer seals a new sstable, this writer will flush a new sstable as well, // right before there's an attempt to release exhausted sstables earlier.