diff --git a/compaction/compaction_backlog_manager.hh b/compaction/compaction_backlog_manager.hh index dc13819b3e..13ad1724d3 100644 --- a/compaction/compaction_backlog_manager.hh +++ b/compaction/compaction_backlog_manager.hh @@ -60,7 +60,8 @@ public: using ongoing_compactions = std::unordered_map; struct impl { - virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) = 0; + // FIXME: Should provide strong exception safety guarantees + virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) = 0; virtual double backlog(const ongoing_writes& ow, const ongoing_compactions& oc) const = 0; virtual ~impl() { } }; @@ -72,6 +73,7 @@ public: ~compaction_backlog_tracker(); double backlog() const; + // FIXME: Should provide strong exception safety guarantees void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts); void register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp); void register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp); diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 409813c767..59eed477a0 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -275,7 +275,7 @@ private: virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return _added_backlog * _available_memory; } - virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} + virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override {} }; compaction::compaction_state& compaction_manager::get_compaction_state(table_state* t) { @@ -1918,6 +1918,7 @@ void compaction_backlog_tracker::replace_sstables(const std::vectorreplace_sstables(filter_and_revert_charges(old_ssts), filter_and_revert_charges(new_ssts)); } catch (...) { diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index a86db5cd24..f6963e08fc 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -12,6 +12,8 @@ #include #include #include +#include "seastar/core/on_internal_error.hh" +#include "sstables/shared_sstable.hh" #include "sstables/sstables.hh" #include "compaction.hh" #include "compaction_strategy.hh" @@ -107,7 +109,7 @@ size_tiered_backlog_tracker::compacted_backlog(const compaction_backlog_tracker: // A SSTable being compacted may not contribute to backlog if compaction strategy decided // to perform a low-efficiency compaction when system is under little load, or when user // performs major even though strategy is completely satisfied - if (!_sstables_contributing_backlog.contains(crp.first)) { + if (!_contrib.sstables.contains(crp.first)) { continue; } auto compacted = crp.second->compacted(); @@ -117,11 +119,11 @@ size_tiered_backlog_tracker::compacted_backlog(const compaction_backlog_tracker: return in; } -void size_tiered_backlog_tracker::refresh_sstables_backlog_contribution() { - _sstables_backlog_contribution = 0.0f; - _sstables_contributing_backlog = {}; - if (_all.empty()) { - return; +// Provides strong exception safety guarantees. +size_tiered_backlog_tracker::sstables_backlog_contribution size_tiered_backlog_tracker::calculate_sstables_backlog_contribution(const std::vector& all, const sstables::size_tiered_compaction_strategy_options& stcs_options) { + sstables_backlog_contribution contrib; + if (all.empty()) { + return contrib; } using namespace sstables; @@ -131,25 +133,27 @@ void size_tiered_backlog_tracker::refresh_sstables_backlog_contribution() { // in efficient jobs acting more aggressive than they really have to. // TODO: potentially switch to compaction manager's fan-in threshold, so to account for the dynamic // fan-in threshold behavior. - const auto& newest_sst = std::ranges::max(_all, std::less(), std::mem_fn(&sstable::generation)); + const auto& newest_sst = std::ranges::max(all, std::less(), std::mem_fn(&sstable::generation)); auto threshold = newest_sst->get_schema()->min_compaction_threshold(); - for (auto& bucket : size_tiered_compaction_strategy::get_buckets(boost::copy_range>(_all), _stcs_options)) { + for (auto& bucket : size_tiered_compaction_strategy::get_buckets(all, stcs_options)) { if (!size_tiered_compaction_strategy::is_bucket_interesting(bucket, threshold)) { continue; } - _sstables_backlog_contribution += boost::accumulate(bucket | boost::adaptors::transformed([this] (const shared_sstable& sst) -> double { + contrib.value += boost::accumulate(bucket | boost::adaptors::transformed([] (const shared_sstable& sst) -> double { return sst->data_size() * log4(sst->data_size()); }), double(0.0f)); // Controller is disabled if exception is caught during add / remove calls, so not making any effort to make this exception safe - _sstables_contributing_backlog.insert(bucket.begin(), bucket.end()); + contrib.sstables.insert(bucket.begin(), bucket.end()); } + + return contrib; } double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const { inflight_component compacted = compacted_backlog(oc); - auto total_backlog_bytes = boost::accumulate(_sstables_contributing_backlog | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0)); + auto total_backlog_bytes = boost::accumulate(_contrib.sstables | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0)); // Bail out if effective backlog is zero, which happens in a small window where ongoing compaction exhausted // input files but is still sealing output files or doing managerial stuff like updating history table @@ -166,26 +170,41 @@ double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::on auto effective_backlog_bytes = total_backlog_bytes - compacted.total_bytes; // Sum of (Si - Ci) * log (Si) for all SSTables contributing backlog - auto sstables_contribution = _sstables_backlog_contribution - compacted.contribution; + auto sstables_contribution = _contrib.value - compacted.contribution; // This is subtracting ((Si - Ci) * log (Si)) from ((Si - Ci) * log(T)), yielding the final backlog auto b = (effective_backlog_bytes * log4(_total_bytes)) - sstables_contribution; return b > 0 ? b : 0; } -void size_tiered_backlog_tracker::replace_sstables(std::vector old_ssts, std::vector new_ssts) { +// Provides strong exception safety guarantees. +void size_tiered_backlog_tracker::replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) { + auto tmp_all = _all; + auto tmp_total_bytes = _total_bytes; + tmp_all.reserve(_all.size() + new_ssts.size()); + for (auto& sst : old_ssts) { if (sst->data_size() > 0) { - _total_bytes -= sst->data_size(); - _all.erase(sst); + auto erased = tmp_all.erase(sst); + if (erased) { + tmp_total_bytes -= sst->data_size(); + } } } for (auto& sst : new_ssts) { if (sst->data_size() > 0) { - _total_bytes += sst->data_size(); - _all.insert(std::move(sst)); + auto [_, inserted] = tmp_all.insert(sst); + if (inserted) { + tmp_total_bytes += sst->data_size(); + } } } - refresh_sstables_backlog_contribution(); + auto tmp_contrib = calculate_sstables_backlog_contribution(boost::copy_range>(tmp_all), _stcs_options); + + std::invoke([&] () noexcept { + _all = std::move(tmp_all); + _total_bytes = tmp_total_bytes; + _contrib = std::move(tmp_contrib); + }); } namespace sstables { @@ -263,23 +282,25 @@ public: return b; } - virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override { + // Provides strong exception safety guarantees + virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override { struct replacement { std::vector old_ssts; std::vector new_ssts; }; std::unordered_map per_window_replacement; + auto tmp_windows = _windows; for (auto& sst : new_ssts) { auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); - if (!_windows.contains(bound)) { - _windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options)); + if (!tmp_windows.contains(bound)) { + tmp_windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options)); } per_window_replacement[bound].new_ssts.push_back(std::move(sst)); } for (auto& sst : old_ssts) { auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); - if (_windows.contains(bound)) { + if (tmp_windows.contains(bound)) { per_window_replacement[bound].old_ssts.push_back(std::move(sst)); } } @@ -287,12 +308,20 @@ public: for (auto& [bound, r] : per_window_replacement) { // All windows must exist here, as windows are created for new files and will // remain alive as long as there's a single file in them - auto& w = _windows.at(bound); - w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts)); + auto it = tmp_windows.find(bound); + if (it == tmp_windows.end()) { + on_internal_error(clogger, fmt::format("window for bound {} not found", bound)); + } + auto& w = it->second; + w.replace_sstables(r.old_ssts, r.new_ssts); if (w.total_bytes() <= 0) { - _windows.erase(bound); + tmp_windows.erase(bound); } } + + std::invoke([&] () noexcept { + _windows = std::move(tmp_windows); + }); } }; @@ -392,25 +421,31 @@ public: return b; } - virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override { + // Provides strong exception safety guarantees + virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override { + auto tmp_size_per_level = _size_per_level; std::vector l0_old_ssts, l0_new_ssts; for (auto& sst : new_ssts) { auto level = sst->get_sstable_level(); - _size_per_level[level] += sst->data_size(); + tmp_size_per_level[level] += sst->data_size(); if (level == 0) { l0_new_ssts.push_back(std::move(sst)); } } for (auto& sst : old_ssts) { auto level = sst->get_sstable_level(); - _size_per_level[level] -= sst->data_size(); + tmp_size_per_level[level] -= sst->data_size(); if (level == 0) { l0_old_ssts.push_back(std::move(sst)); } } if (l0_old_ssts.size() || l0_new_ssts.size()) { + // stcs replace_sstables guarantees strong exception safety _l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts)); } + std::invoke([&] () noexcept { + _size_per_level = std::move(tmp_size_per_level); + }); } }; @@ -418,14 +453,14 @@ struct unimplemented_backlog_tracker final : public compaction_backlog_tracker:: virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return compaction_controller::disable_backlog; } - virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} + virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override {} }; struct null_backlog_tracker final : public compaction_backlog_tracker::impl { virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return 0; } - virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} + virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override {} }; // diff --git a/compaction/size_tiered_backlog_tracker.hh b/compaction/size_tiered_backlog_tracker.hh index 3165f3bf64..61f2123c7a 100644 --- a/compaction/size_tiered_backlog_tracker.hh +++ b/compaction/size_tiered_backlog_tracker.hh @@ -64,10 +64,14 @@ // certain point in time, whose size is the amount of bytes currently written. So all we need // to do is keep track of them too, and add the current estimate to the static part of (4). class size_tiered_backlog_tracker final : public compaction_backlog_tracker::impl { + struct sstables_backlog_contribution { + double value = 0.0f; + std::unordered_set sstables; + }; + sstables::size_tiered_compaction_strategy_options _stcs_options; int64_t _total_bytes = 0; - double _sstables_backlog_contribution = 0.0f; - std::unordered_set _sstables_contributing_backlog; + sstables_backlog_contribution _contrib; std::unordered_set _all; struct inflight_component { @@ -77,12 +81,12 @@ class size_tiered_backlog_tracker final : public compaction_backlog_tracker::imp inflight_component compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const; - double log4(double x) const { + static double log4(double x) { double inv_log_4 = 1.0f / std::log(4); return log(x) * inv_log_4; } - void refresh_sstables_backlog_contribution(); + static sstables_backlog_contribution calculate_sstables_backlog_contribution(const std::vector& all, const sstables::size_tiered_compaction_strategy_options& stcs_options); public: size_tiered_backlog_tracker(sstables::size_tiered_compaction_strategy_options stcs_options) : _stcs_options(stcs_options) {} @@ -90,7 +94,8 @@ public: // Removing could be the result of a failure of an in progress write, successful finish of a // compaction, or some one-off operation, like drop - virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override; + // Provides strong exception safety guarantees. + virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override; int64_t total_bytes() const { return _total_bytes; diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index b067b7f5d8..fc1ecd304e 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -922,7 +922,7 @@ void consume_sstables(schema_ptr schema, reader_permit permit, std::vector old_ssts, std::vector new_ssts) override { } + virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override { } virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return 0.0; } };