mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
sstables: Move compaction_write_monitor earlier in the file
This will used by followup patches. Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
This commit is contained in:
@@ -163,6 +163,64 @@ static std::vector<shared_sstable> get_uncompacting_sstables(column_family& cf,
|
||||
|
||||
class compaction;
|
||||
|
||||
class compaction_write_monitor final : public sstables::write_monitor, public backlog_write_progress_manager {
|
||||
sstables::shared_sstable _sst;
|
||||
column_family& _cf;
|
||||
const sstables::writer_offset_tracker* _tracker = nullptr;
|
||||
uint64_t _progress_seen = 0;
|
||||
api::timestamp_type _maximum_timestamp;
|
||||
unsigned _sstable_level;
|
||||
public:
|
||||
compaction_write_monitor(sstables::shared_sstable sst, column_family& cf, api::timestamp_type max_timestamp, unsigned sstable_level)
|
||||
: _sst(sst)
|
||||
, _cf(cf)
|
||||
, _maximum_timestamp(max_timestamp)
|
||||
, _sstable_level(sstable_level)
|
||||
{}
|
||||
|
||||
~compaction_write_monitor() {
|
||||
if (_sst) {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
|
||||
}
|
||||
}
|
||||
|
||||
virtual void on_write_started(const sstables::writer_offset_tracker& tracker) override {
|
||||
_tracker = &tracker;
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().register_partially_written_sstable(_sst, *this);
|
||||
}
|
||||
|
||||
virtual void on_data_write_completed() override {
|
||||
if (_tracker) {
|
||||
_progress_seen = _tracker->offset;
|
||||
_tracker = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
virtual void write_failed() override {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().revert_charges(std::move(_sst));
|
||||
}
|
||||
|
||||
virtual uint64_t written() const {
|
||||
if (_tracker) {
|
||||
return _tracker->offset;
|
||||
}
|
||||
return _progress_seen;
|
||||
}
|
||||
|
||||
void add_sstable() {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().add_sstable(_sst);
|
||||
_sst = {};
|
||||
}
|
||||
|
||||
api::timestamp_type maximum_timestamp() const override {
|
||||
return _maximum_timestamp;
|
||||
}
|
||||
|
||||
unsigned level() const override {
|
||||
return _sstable_level;
|
||||
}
|
||||
};
|
||||
|
||||
struct compaction_writer {
|
||||
sstable_writer writer;
|
||||
shared_sstable sst;
|
||||
@@ -273,64 +331,6 @@ private:
|
||||
std::deque<compaction_read_monitor> _generated_monitors;
|
||||
};
|
||||
|
||||
class compaction_write_monitor final : public sstables::write_monitor, public backlog_write_progress_manager {
|
||||
sstables::shared_sstable _sst;
|
||||
column_family& _cf;
|
||||
const sstables::writer_offset_tracker* _tracker = nullptr;
|
||||
uint64_t _progress_seen = 0;
|
||||
api::timestamp_type _maximum_timestamp;
|
||||
unsigned _sstable_level;
|
||||
public:
|
||||
compaction_write_monitor(sstables::shared_sstable sst, column_family& cf, api::timestamp_type max_timestamp, unsigned sstable_level)
|
||||
: _sst(sst)
|
||||
, _cf(cf)
|
||||
, _maximum_timestamp(max_timestamp)
|
||||
, _sstable_level(sstable_level)
|
||||
{}
|
||||
|
||||
~compaction_write_monitor() {
|
||||
if (_sst) {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
|
||||
}
|
||||
}
|
||||
|
||||
virtual void on_write_started(const sstables::writer_offset_tracker& tracker) override {
|
||||
_tracker = &tracker;
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().register_partially_written_sstable(_sst, *this);
|
||||
}
|
||||
|
||||
virtual void on_data_write_completed() override {
|
||||
if (_tracker) {
|
||||
_progress_seen = _tracker->offset;
|
||||
_tracker = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
virtual void write_failed() override {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().revert_charges(std::move(_sst));
|
||||
}
|
||||
|
||||
virtual uint64_t written() const {
|
||||
if (_tracker) {
|
||||
return _tracker->offset;
|
||||
}
|
||||
return _progress_seen;
|
||||
}
|
||||
|
||||
void add_sstable() {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().add_sstable(_sst);
|
||||
_sst = {};
|
||||
}
|
||||
|
||||
api::timestamp_type maximum_timestamp() const override {
|
||||
return _maximum_timestamp;
|
||||
}
|
||||
|
||||
unsigned level() const override {
|
||||
return _sstable_level;
|
||||
}
|
||||
};
|
||||
|
||||
// Writes a temporary sstable run containing only garbage collected data.
|
||||
// Whenever regular compaction writer seals a new sstable, this writer will flush a new sstable as well,
|
||||
// right before there's an attempt to release exhausted sstables earlier.
|
||||
|
||||
Reference in New Issue
Block a user