diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 4cd1c28b7d..9206c6db75 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -12,6 +12,7 @@ #include #include #include +#include "seastar/core/on_internal_error.hh" #include "sstables/shared_sstable.hh" #include "sstables/sstables.hh" #include "compaction.hh" @@ -283,24 +284,25 @@ public: return b; } - // FIXME: Should provide strong exception safety guarantees + // Provides strong exception safety guarantees virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override { struct replacement { std::vector old_ssts; std::vector new_ssts; }; std::unordered_map per_window_replacement; + auto tmp_windows = _windows; for (auto& sst : new_ssts) { auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); - if (!_windows.contains(bound)) { - _windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options)); + if (!tmp_windows.contains(bound)) { + tmp_windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options)); } per_window_replacement[bound].new_ssts.push_back(std::move(sst)); } for (auto& sst : old_ssts) { auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); - if (_windows.contains(bound)) { + if (tmp_windows.contains(bound)) { per_window_replacement[bound].old_ssts.push_back(std::move(sst)); } } @@ -308,12 +310,20 @@ public: for (auto& [bound, r] : per_window_replacement) { // All windows must exist here, as windows are created for new files and will // remain alive as long as there's a single file in them - auto& w = _windows.at(bound); - w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts)); + auto it = tmp_windows.find(bound); + if (it == tmp_windows.end()) { + on_internal_error(clogger, fmt::format("window for bound {} not found", bound)); + } + auto& w = it->second; + w.replace_sstables(r.old_ssts, r.new_ssts); if (w.total_bytes() <= 0) { - _windows.erase(bound); + tmp_windows.erase(bound); } } + + std::invoke([&] () noexcept { + _windows = std::move(tmp_windows); + }); } };