diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index cd56773a2f..1e7fa7d517 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -442,6 +442,76 @@ public: } }; +class leveled_compaction_backlog_tracker final : public compaction_backlog_tracker::impl { + // Because we can do SCTS in L0, we will account for that in the backlog. + // Whatever backlog we accumulate here will be added to the main backlog. + size_tiered_backlog_tracker _l0_scts; + std::vector _size_per_level; + uint64_t _max_sstable_size; +public: + leveled_compaction_backlog_tracker(int32_t max_sstable_size_in_mb) + : _size_per_level(leveled_manifest::MAX_LEVELS, uint64_t(0)) + , _max_sstable_size(max_sstable_size_in_mb * 1024 * 1024) + {} + + virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { + std::vector effective_size_per_level = _size_per_level; + compaction_backlog_tracker::ongoing_writes l0_partial_writes; + compaction_backlog_tracker::ongoing_compactions l0_compacted; + + for (auto& op : ow) { + auto level = op.second->level(); + if (level == 0) { + l0_partial_writes.insert(op); + } + effective_size_per_level[level] += op.second->written(); + } + + for (auto& cp : oc) { + auto level = cp.first->get_sstable_level(); + if (level == 0) { + l0_compacted.insert(cp); + } + effective_size_per_level[level] -= cp.second->compacted(); + } + + double b = _l0_scts.backlog(l0_partial_writes, l0_compacted); + // Backlog for a level: size_of_level * (max_level - n) * fan_out + // + // The fan_out is usually 10. But if the level above us is not + // fully populated-- which can happen when a level is still being born, we don't want that + // to jump abruptly. So what we will do instead is to define the fan out as the minimum + // between 10 and the number of sstables that are estimated to be there. + // + // Because of that, it's easier to write this code as an accumulator loop. If we are level + // L, for each level L + n, n > 0, we accumulate sizeof(L) * fan_out_of(L+n) + for (size_t level = 0; level < _size_per_level.size() - 1; ++level) { + auto lsize = effective_size_per_level[level]; + for (size_t next = level + 1; next < _size_per_level.size() - 1; ++next) { + auto lsize_next = effective_size_per_level[next]; + b += std::min(double(leveled_manifest::leveled_fan_out), double(lsize_next) / _max_sstable_size) * lsize; + } + } + return b; + } + + virtual void add_sstable(sstables::shared_sstable sst) override { + auto level = sst->get_sstable_level(); + _size_per_level[level] += sst->data_size(); + if (level == 0) { + _l0_scts.add_sstable(sst); + } + } + + virtual void remove_sstable(sstables::shared_sstable sst) override { + auto level = sst->get_sstable_level(); + _size_per_level[level] -= sst->data_size(); + if (level == 0) { + _l0_scts.remove_sstable(sst); + } + } +}; + struct unimplemented_backlog_tracker final : public compaction_backlog_tracker::impl { virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { @@ -494,7 +564,7 @@ leveled_compaction_strategy::leveled_compaction_strategy(const std::map()) + , _backlog_tracker(std::make_unique(_max_sstable_size_in_mb)) { _compaction_counter.resize(leveled_manifest::MAX_LEVELS); }