Merge 'Make replace sstables implementations exception safe' from Benny Halevy

This is the first phase of providing strong exception safety guarantees by the generic `compaction_backlog_tracker::replace_sstables`.

Once all compaction strategies backlog trackers' replace_sstables provide strong exception safety guarantees (i.e. they may throw an exception but must revert on error any intermediate changes they made to restore the tracker to the pre-update state).

Once this series is merged and ICS replace_sstables is also made strongly exception safe (using infrastructure from size_tiered_backlog_tracker introduced here), `compaction_backlog_tracker::replace_sstables` may allow exceptions to propagate back to the caller rather than disabling the backlog tracker on errors.

Closes #14104

* github.com:scylladb/scylladb:
  leveled_compaction_backlog_tracker: replace_sstables: provide strong exception safety guarantees
  time_window_backlog_tracker: replace_sstables: provide strong exception safety guarantees
  size_tiered_backlog_tracker: replace_sstables: provide strong exception safety guarantees
  size_tiered_backlog_tracker: provide static calculate_sstables_backlog_contribution
  size_tiered_backlog_tracker: make log4 helper static
  size_tiered_backlog_tracker: define struct sstables_backlog_contribution
  size_tiered_backlog_tracker: update_sstables: update total_bytes only if set changed
  compaction_backlog_tracker: replace_sstables: pass old and new sstables vectors by ref
  compaction_backlog_tracker: replace_sstables: add FIXME comments about strong exception safety
This commit is contained in:
Avi Kivity
2023-07-17 12:32:27 +03:00
5 changed files with 81 additions and 38 deletions

View File

@@ -60,7 +60,8 @@ public:
using ongoing_compactions = std::unordered_map<sstables::shared_sstable, backlog_read_progress_manager*>;
struct impl {
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) = 0;
// FIXME: Should provide strong exception safety guarantees
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) = 0;
virtual double backlog(const ongoing_writes& ow, const ongoing_compactions& oc) const = 0;
virtual ~impl() { }
};
@@ -72,6 +73,7 @@ public:
~compaction_backlog_tracker();
double backlog() const;
// FIXME: Should provide strong exception safety guarantees
void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts);
void register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp);
void register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp);

View File

@@ -275,7 +275,7 @@ private:
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
return _added_backlog * _available_memory;
}
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override {}
};
compaction::compaction_state& compaction_manager::get_compaction_state(table_state* t) {
@@ -1918,6 +1918,7 @@ void compaction_backlog_tracker::replace_sstables(const std::vector<sstables::sh
return ret;
};
// FIXME: propagate exception to caller once all replace_sstables implementations provide strong exception safety guarantees.
try {
_impl->replace_sstables(filter_and_revert_charges(old_ssts), filter_and_revert_charges(new_ssts));
} catch (...) {

View File

@@ -12,6 +12,8 @@
#include <vector>
#include <chrono>
#include <seastar/core/shared_ptr.hh>
#include "seastar/core/on_internal_error.hh"
#include "sstables/shared_sstable.hh"
#include "sstables/sstables.hh"
#include "compaction.hh"
#include "compaction_strategy.hh"
@@ -107,7 +109,7 @@ size_tiered_backlog_tracker::compacted_backlog(const compaction_backlog_tracker:
// A SSTable being compacted may not contribute to backlog if compaction strategy decided
// to perform a low-efficiency compaction when system is under little load, or when user
// performs major even though strategy is completely satisfied
if (!_sstables_contributing_backlog.contains(crp.first)) {
if (!_contrib.sstables.contains(crp.first)) {
continue;
}
auto compacted = crp.second->compacted();
@@ -117,11 +119,11 @@ size_tiered_backlog_tracker::compacted_backlog(const compaction_backlog_tracker:
return in;
}
void size_tiered_backlog_tracker::refresh_sstables_backlog_contribution() {
_sstables_backlog_contribution = 0.0f;
_sstables_contributing_backlog = {};
if (_all.empty()) {
return;
// Provides strong exception safety guarantees.
size_tiered_backlog_tracker::sstables_backlog_contribution size_tiered_backlog_tracker::calculate_sstables_backlog_contribution(const std::vector<sstables::shared_sstable>& all, const sstables::size_tiered_compaction_strategy_options& stcs_options) {
sstables_backlog_contribution contrib;
if (all.empty()) {
return contrib;
}
using namespace sstables;
@@ -131,25 +133,27 @@ void size_tiered_backlog_tracker::refresh_sstables_backlog_contribution() {
// in efficient jobs acting more aggressive than they really have to.
// TODO: potentially switch to compaction manager's fan-in threshold, so to account for the dynamic
// fan-in threshold behavior.
const auto& newest_sst = std::ranges::max(_all, std::less<generation_type>(), std::mem_fn(&sstable::generation));
const auto& newest_sst = std::ranges::max(all, std::less<generation_type>(), std::mem_fn(&sstable::generation));
auto threshold = newest_sst->get_schema()->min_compaction_threshold();
for (auto& bucket : size_tiered_compaction_strategy::get_buckets(boost::copy_range<std::vector<shared_sstable>>(_all), _stcs_options)) {
for (auto& bucket : size_tiered_compaction_strategy::get_buckets(all, stcs_options)) {
if (!size_tiered_compaction_strategy::is_bucket_interesting(bucket, threshold)) {
continue;
}
_sstables_backlog_contribution += boost::accumulate(bucket | boost::adaptors::transformed([this] (const shared_sstable& sst) -> double {
contrib.value += boost::accumulate(bucket | boost::adaptors::transformed([] (const shared_sstable& sst) -> double {
return sst->data_size() * log4(sst->data_size());
}), double(0.0f));
// Controller is disabled if exception is caught during add / remove calls, so not making any effort to make this exception safe
_sstables_contributing_backlog.insert(bucket.begin(), bucket.end());
contrib.sstables.insert(bucket.begin(), bucket.end());
}
return contrib;
}
double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const {
inflight_component compacted = compacted_backlog(oc);
auto total_backlog_bytes = boost::accumulate(_sstables_contributing_backlog | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0));
auto total_backlog_bytes = boost::accumulate(_contrib.sstables | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0));
// Bail out if effective backlog is zero, which happens in a small window where ongoing compaction exhausted
// input files but is still sealing output files or doing managerial stuff like updating history table
@@ -166,26 +170,41 @@ double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::on
auto effective_backlog_bytes = total_backlog_bytes - compacted.total_bytes;
// Sum of (Si - Ci) * log (Si) for all SSTables contributing backlog
auto sstables_contribution = _sstables_backlog_contribution - compacted.contribution;
auto sstables_contribution = _contrib.value - compacted.contribution;
// This is subtracting ((Si - Ci) * log (Si)) from ((Si - Ci) * log(T)), yielding the final backlog
auto b = (effective_backlog_bytes * log4(_total_bytes)) - sstables_contribution;
return b > 0 ? b : 0;
}
void size_tiered_backlog_tracker::replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) {
// Provides strong exception safety guarantees.
void size_tiered_backlog_tracker::replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) {
auto tmp_all = _all;
auto tmp_total_bytes = _total_bytes;
tmp_all.reserve(_all.size() + new_ssts.size());
for (auto& sst : old_ssts) {
if (sst->data_size() > 0) {
_total_bytes -= sst->data_size();
_all.erase(sst);
auto erased = tmp_all.erase(sst);
if (erased) {
tmp_total_bytes -= sst->data_size();
}
}
}
for (auto& sst : new_ssts) {
if (sst->data_size() > 0) {
_total_bytes += sst->data_size();
_all.insert(std::move(sst));
auto [_, inserted] = tmp_all.insert(sst);
if (inserted) {
tmp_total_bytes += sst->data_size();
}
}
}
refresh_sstables_backlog_contribution();
auto tmp_contrib = calculate_sstables_backlog_contribution(boost::copy_range<std::vector<shared_sstable>>(tmp_all), _stcs_options);
std::invoke([&] () noexcept {
_all = std::move(tmp_all);
_total_bytes = tmp_total_bytes;
_contrib = std::move(tmp_contrib);
});
}
namespace sstables {
@@ -263,23 +282,25 @@ public:
return b;
}
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {
// Provides strong exception safety guarantees
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override {
struct replacement {
std::vector<sstables::shared_sstable> old_ssts;
std::vector<sstables::shared_sstable> new_ssts;
};
std::unordered_map<api::timestamp_type, replacement> per_window_replacement;
auto tmp_windows = _windows;
for (auto& sst : new_ssts) {
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
if (!_windows.contains(bound)) {
_windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options));
if (!tmp_windows.contains(bound)) {
tmp_windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options));
}
per_window_replacement[bound].new_ssts.push_back(std::move(sst));
}
for (auto& sst : old_ssts) {
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
if (_windows.contains(bound)) {
if (tmp_windows.contains(bound)) {
per_window_replacement[bound].old_ssts.push_back(std::move(sst));
}
}
@@ -287,12 +308,20 @@ public:
for (auto& [bound, r] : per_window_replacement) {
// All windows must exist here, as windows are created for new files and will
// remain alive as long as there's a single file in them
auto& w = _windows.at(bound);
w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts));
auto it = tmp_windows.find(bound);
if (it == tmp_windows.end()) {
on_internal_error(clogger, fmt::format("window for bound {} not found", bound));
}
auto& w = it->second;
w.replace_sstables(r.old_ssts, r.new_ssts);
if (w.total_bytes() <= 0) {
_windows.erase(bound);
tmp_windows.erase(bound);
}
}
std::invoke([&] () noexcept {
_windows = std::move(tmp_windows);
});
}
};
@@ -392,25 +421,31 @@ public:
return b;
}
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {
// Provides strong exception safety guarantees
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override {
auto tmp_size_per_level = _size_per_level;
std::vector<sstables::shared_sstable> l0_old_ssts, l0_new_ssts;
for (auto& sst : new_ssts) {
auto level = sst->get_sstable_level();
_size_per_level[level] += sst->data_size();
tmp_size_per_level[level] += sst->data_size();
if (level == 0) {
l0_new_ssts.push_back(std::move(sst));
}
}
for (auto& sst : old_ssts) {
auto level = sst->get_sstable_level();
_size_per_level[level] -= sst->data_size();
tmp_size_per_level[level] -= sst->data_size();
if (level == 0) {
l0_old_ssts.push_back(std::move(sst));
}
}
if (l0_old_ssts.size() || l0_new_ssts.size()) {
// stcs replace_sstables guarantees strong exception safety
_l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts));
}
std::invoke([&] () noexcept {
_size_per_level = std::move(tmp_size_per_level);
});
}
};
@@ -418,14 +453,14 @@ struct unimplemented_backlog_tracker final : public compaction_backlog_tracker::
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
return compaction_controller::disable_backlog;
}
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override {}
};
struct null_backlog_tracker final : public compaction_backlog_tracker::impl {
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
return 0;
}
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override {}
};
//

View File

@@ -64,10 +64,14 @@
// certain point in time, whose size is the amount of bytes currently written. So all we need
// to do is keep track of them too, and add the current estimate to the static part of (4).
class size_tiered_backlog_tracker final : public compaction_backlog_tracker::impl {
struct sstables_backlog_contribution {
double value = 0.0f;
std::unordered_set<sstables::shared_sstable> sstables;
};
sstables::size_tiered_compaction_strategy_options _stcs_options;
int64_t _total_bytes = 0;
double _sstables_backlog_contribution = 0.0f;
std::unordered_set<sstables::shared_sstable> _sstables_contributing_backlog;
sstables_backlog_contribution _contrib;
std::unordered_set<sstables::shared_sstable> _all;
struct inflight_component {
@@ -77,12 +81,12 @@ class size_tiered_backlog_tracker final : public compaction_backlog_tracker::imp
inflight_component compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const;
double log4(double x) const {
static double log4(double x) {
double inv_log_4 = 1.0f / std::log(4);
return log(x) * inv_log_4;
}
void refresh_sstables_backlog_contribution();
static sstables_backlog_contribution calculate_sstables_backlog_contribution(const std::vector<sstables::shared_sstable>& all, const sstables::size_tiered_compaction_strategy_options& stcs_options);
public:
size_tiered_backlog_tracker(sstables::size_tiered_compaction_strategy_options stcs_options) : _stcs_options(stcs_options) {}
@@ -90,7 +94,8 @@ public:
// Removing could be the result of a failure of an in progress write, successful finish of a
// compaction, or some one-off operation, like drop
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override;
// Provides strong exception safety guarantees.
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override;
int64_t total_bytes() const {
return _total_bytes;

View File

@@ -922,7 +922,7 @@ void consume_sstables(schema_ptr schema, reader_permit permit, std::vector<sstab
class scylla_sstable_table_state : public compaction::table_state {
struct dummy_compaction_backlog_tracker : public compaction_backlog_tracker::impl {
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override { }
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override { }
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return 0.0; }
};