diff --git a/compaction/compaction_backlog_manager.hh b/compaction/compaction_backlog_manager.hh index 15b8db6ee2..b996f7bc28 100644 --- a/compaction/compaction_backlog_manager.hh +++ b/compaction/compaction_backlog_manager.hh @@ -60,8 +60,7 @@ public: using ongoing_compactions = std::unordered_map; struct impl { - virtual void add_sstable(sstables::shared_sstable sst) = 0; - virtual void remove_sstable(sstables::shared_sstable sst) = 0; + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) = 0; virtual double backlog(const ongoing_writes& ow, const ongoing_compactions& oc) const = 0; virtual ~impl() { } }; @@ -72,8 +71,7 @@ public: ~compaction_backlog_tracker(); double backlog() const; - void add_sstable(sstables::shared_sstable sst); - void remove_sstable(sstables::shared_sstable sst); + void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts); void register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp); void register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp); void transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges = true); diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index ff581f267a..62471772cb 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -237,8 +237,7 @@ private: virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return _added_backlog * _available_memory; } - virtual void add_sstable(sstables::shared_sstable sst) override { } - virtual void remove_sstable(sstables::shared_sstable sst) override { } + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; compaction_manager::compaction_state& compaction_manager::get_compaction_state(replica::table* t) { @@ -1150,27 +1149,23 @@ double compaction_backlog_tracker::backlog() const { return disabled() ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions); } -void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) { - if (disabled() || !sstable_belongs_to_tracker(sst)) { +void compaction_backlog_tracker::replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) { + if (disabled()) { return; } - _ongoing_writes.erase(sst); - try { - _impl->add_sstable(std::move(sst)); - } catch (...) { - cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception()); - disable(); - } -} + auto filter_and_revert_charges = [this] (const std::vector& ssts) { + std::vector ret; + for (auto& sst : ssts) { + if (sstable_belongs_to_tracker(sst)) { + revert_charges(sst); + ret.push_back(sst); + } + } + return ret; + }; -void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) { - if (disabled() || !sstable_belongs_to_tracker(sst)) { - return; - } - - _ongoing_compactions.erase(sst); try { - _impl->remove_sstable(std::move(sst)); + _impl->replace_sstables(filter_and_revert_charges(old_ssts), filter_and_revert_charges(new_ssts)); } catch (...) { cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception()); disable(); diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 764082f996..b7c74849b3 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -131,17 +131,18 @@ double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::on return b > 0 ? b : 0; } -void size_tiered_backlog_tracker::add_sstable(sstables::shared_sstable sst) { - if (sst->data_size() > 0) { - _total_bytes += sst->data_size(); - _sstables_backlog_contribution += sst->data_size() * log4(sst->data_size()); +void size_tiered_backlog_tracker::replace_sstables(std::vector old_ssts, std::vector new_ssts) { + for (auto& sst : new_ssts) { + if (sst->data_size() > 0) { + _total_bytes += sst->data_size(); + _sstables_backlog_contribution += sst->data_size() * log4(sst->data_size()); + } } -} - -void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) { - if (sst->data_size() > 0) { - _total_bytes -= sst->data_size(); - _sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size()); + for (auto& sst : old_ssts) { + if (sst->data_size() > 0) { + _total_bytes -= sst->data_size(); + _sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size()); + } } } @@ -218,18 +219,29 @@ public: return b; } - virtual void add_sstable(sstables::shared_sstable sst) override { - auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); - _windows[bound].add_sstable(sst); - } + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override { + struct replacement { + std::vector old_ssts; + std::vector new_ssts; + }; + std::unordered_map per_window_replacement; - virtual void remove_sstable(sstables::shared_sstable sst) override { - auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); - auto it = _windows.find(bound); - if (it != _windows.end()) { - it->second.remove_sstable(sst); - if (it->second.total_bytes() <= 0) { - _windows.erase(it); + for (auto& sst : new_ssts) { + auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); + per_window_replacement[bound].new_ssts.push_back(std::move(sst)); + } + for (auto& sst : old_ssts) { + auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); + if (_windows.contains(bound)) { + per_window_replacement[bound].old_ssts.push_back(std::move(sst)); + } + } + + for (auto& [bound, r] : per_window_replacement) { + auto& w = _windows[bound]; + w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts)); + if (w.total_bytes() <= 0) { + _windows.erase(bound); } } } @@ -288,20 +300,23 @@ public: return b; } - virtual void add_sstable(sstables::shared_sstable sst) override { - auto level = sst->get_sstable_level(); - _size_per_level[level] += sst->data_size(); - if (level == 0) { - _l0_scts.add_sstable(sst); + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override { + std::vector l0_old_ssts, l0_new_ssts; + for (auto& sst : new_ssts) { + auto level = sst->get_sstable_level(); + _size_per_level[level] += sst->data_size(); + if (level == 0) { + l0_new_ssts.push_back(std::move(sst)); + } } - } - - virtual void remove_sstable(sstables::shared_sstable sst) override { - auto level = sst->get_sstable_level(); - _size_per_level[level] -= sst->data_size(); - if (level == 0) { - _l0_scts.remove_sstable(sst); + for (auto& sst : old_ssts) { + auto level = sst->get_sstable_level(); + _size_per_level[level] -= sst->data_size(); + if (level == 0) { + l0_old_ssts.push_back(std::move(sst)); + } } + _l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts)); } }; @@ -309,16 +324,14 @@ struct unimplemented_backlog_tracker final : public compaction_backlog_tracker:: virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return compaction_controller::disable_backlog; } - virtual void add_sstable(sstables::shared_sstable sst) override { } - virtual void remove_sstable(sstables::shared_sstable sst) override { } + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; struct null_backlog_tracker final : public compaction_backlog_tracker::impl { virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return 0; } - virtual void add_sstable(sstables::shared_sstable sst) override { } - virtual void remove_sstable(sstables::shared_sstable sst) override { } + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; // Just so that if we have more than one CF with NullStrategy, we don't create a lot diff --git a/compaction/size_tiered_backlog_tracker.hh b/compaction/size_tiered_backlog_tracker.hh index 67ca3aa005..b0de962294 100644 --- a/compaction/size_tiered_backlog_tracker.hh +++ b/compaction/size_tiered_backlog_tracker.hh @@ -82,11 +82,9 @@ class size_tiered_backlog_tracker final : public compaction_backlog_tracker::imp public: virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override; - virtual void add_sstable(sstables::shared_sstable sst) override; - // Removing could be the result of a failure of an in progress write, successful finish of a // compaction, or some one-off operation, like drop - virtual void remove_sstable(sstables::shared_sstable sst) override; + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override; int64_t total_bytes() const { return _total_bytes; diff --git a/replica/table.cc b/replica/table.cc index e4ede8d23a..828f13f052 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -368,21 +368,16 @@ void table::update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable) no } inline void table::add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) { - tracker.add_sstable(std::move(sstable)); + tracker.replace_sstables({}, {std::move(sstable)}); } inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) { - tracker.remove_sstable(std::move(sstable)); + tracker.replace_sstables({std::move(sstable)}, {}); } void table::backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables) { auto& tracker = _compaction_strategy.get_backlog_tracker(); - for (auto& sst : new_sstables) { - tracker.add_sstable(sst); - } - for (auto& sst : old_sstables) { - tracker.remove_sstable(sst); - } + tracker.replace_sstables(old_sstables, new_sstables); } lw_shared_ptr diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index f46b07dc4f..1d2253cab0 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3209,7 +3209,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy }; for (auto& sst : ssts) { - cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst); + cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst}); } // Start compaction, then stop tracking compaction, switch to TWCS, wait for compaction to finish and check for backlog. @@ -3220,7 +3220,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy // set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker. cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window); for (auto& sst : ssts) { - cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst); + cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst}); } auto ret = fut.get0();