time_window_backlog_tracker: replace_sstables: provide strong exception safety guarantees

Modify a temporary copy of the `_windows` map
and move-assign it back atomically when done.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-05-31 16:03:51 +03:00
parent 635c564a9d
commit 39d4b548fc

View File

@@ -12,6 +12,7 @@
#include <vector>
#include <chrono>
#include <seastar/core/shared_ptr.hh>
#include "seastar/core/on_internal_error.hh"
#include "sstables/shared_sstable.hh"
#include "sstables/sstables.hh"
#include "compaction.hh"
@@ -283,24 +284,25 @@ public:
return b;
}
// FIXME: Should provide strong exception safety guarantees
// Provides strong exception safety guarantees
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override {
struct replacement {
std::vector<sstables::shared_sstable> old_ssts;
std::vector<sstables::shared_sstable> new_ssts;
};
std::unordered_map<api::timestamp_type, replacement> per_window_replacement;
auto tmp_windows = _windows;
for (auto& sst : new_ssts) {
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
if (!_windows.contains(bound)) {
_windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options));
if (!tmp_windows.contains(bound)) {
tmp_windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options));
}
per_window_replacement[bound].new_ssts.push_back(std::move(sst));
}
for (auto& sst : old_ssts) {
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
if (_windows.contains(bound)) {
if (tmp_windows.contains(bound)) {
per_window_replacement[bound].old_ssts.push_back(std::move(sst));
}
}
@@ -308,12 +310,20 @@ public:
for (auto& [bound, r] : per_window_replacement) {
// All windows must exist here, as windows are created for new files and will
// remain alive as long as there's a single file in them
auto& w = _windows.at(bound);
w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts));
auto it = tmp_windows.find(bound);
if (it == tmp_windows.end()) {
on_internal_error(clogger, fmt::format("window for bound {} not found", bound));
}
auto& w = it->second;
w.replace_sstables(r.old_ssts, r.new_ssts);
if (w.total_bytes() <= 0) {
_windows.erase(bound);
tmp_windows.erase(bound);
}
}
std::invoke([&] () noexcept {
_windows = std::move(tmp_windows);
});
}
};