Merge "Redefine Compaction Backlog to tame compaction aggressiveness" From Raphael S. Carvalho
" Problem statement ================= Today, compaction can act much more aggressive than it really has to, because the strategy and its definition of backlog are completely decoupled. The backlog definition for size-tiered, which is inherited by all strategies (e.g.: LCS L0, TWCS' windows), is built on the assumption that the world must reach the state of zero amplification. But that's unrealistic and goes against the intent amplification defined by the compaction strategy. For example, size tiered is a write oriented strategy which allows for extra space amplification for compaction to keep up with the high write rate. It can be seen today, in many deployments, that compaction shares is either close to 1000, or even stuck at 1000, even though there's nothing to be done, i.e. the compaction strategy is completely satisfied. When there's a single sstable per tier, for example. This means that whenever a new compaction job kicks in, it will act much more aggressive because of the high shares, caused by false backlog of the existing tables. This translates into higher P99 latencies and reduced throughput. Solution ======== This problem can be fixed, as proposed in the document "Fixing compaction aggressiveness due to suboptimal definition of zero backlog by controller" [1], by removing backlog of tiers that don't have to be compacted now, like a tier that has a single file. That's about coupling the strategy goal with the backlog definition. So once strategy becomes satisfied, so will the controller. Low-efficiency compaction, like compacting 2 files only or cross-tier, only happens when system is under little load and can proceed at a slower pace. Once efficient jobs show up, ongoing compactions, even if inefficient, will get more shares (as efficient jobs add to the backlog) so compaction won't fall behind. With this approach, throughput and latency is improved as cpu time is no longer stolen (unnecessarily) from the foreground requests. [1]: https://docs.google.com/document/d/1EQnXXGWg6z7VAwI4u8AaUX1vFduClaf6WOMt2wem5oQ Results ======= Test sequentially populates 3 tables and then run a mixed workload on them, where disk:memory ratio (usage) reaches ~30:1 at the peak. Please find graphs here: https://user-images.githubusercontent.com/1409139/153687219-32368a35-ac63-461b-a362-64dbe8449a00.png 1) Patched version started at ~01:30 2) On population phase, throughput increase and lower P99 write latency can be clearly observed. 3) On mixed phase, throughput increase and lower P99 write and read latency can also be clearly observed. 4) Compaction CPU time sometimes reach ~100% because of the delay between each loader. 5) On unpatched version, it can be seen that backlog keeps growing even when though strategies become satisfied, so compaction is using much more CPU time in comparison. Patched version correctly clears the backlog. Can also be found at: github.com/raphaelsc/scylla.git compaction-controller-v5 tests: UNIT(dev, debug). " * 'compaction-controller-v5' of https://github.com/raphaelsc/scylla: tests: Add compaction controller test test/lib/sstable_utils: Set bytes_on_disk for fake SSTables compaction/size_tiered_backlog_tracker.hh: Use unsigned type for inflight component compaction: Redefine compaction backlog to tame compaction aggressiveness compaction_backlog_tracker: Batch changes through a new replacement interface table: Disable backlog tracker when stopping table compaction_backlog_tracker: make disable() public compaction_backlog_tracker: Clear tracker state when disabled compaction: Add normalized backlog metric compaction: make size_tiered_compaction_strategy static
This commit is contained in:
@@ -60,8 +60,7 @@ public:
|
||||
using ongoing_compactions = std::unordered_map<sstables::shared_sstable, backlog_read_progress_manager*>;
|
||||
|
||||
struct impl {
|
||||
virtual void add_sstable(sstables::shared_sstable sst) = 0;
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) = 0;
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) = 0;
|
||||
virtual double backlog(const ongoing_writes& ow, const ongoing_compactions& oc) const = 0;
|
||||
virtual ~impl() { }
|
||||
};
|
||||
@@ -72,22 +71,21 @@ public:
|
||||
~compaction_backlog_tracker();
|
||||
|
||||
double backlog() const;
|
||||
void add_sstable(sstables::shared_sstable sst);
|
||||
void remove_sstable(sstables::shared_sstable sst);
|
||||
void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts);
|
||||
void register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp);
|
||||
void register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp);
|
||||
void transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges = true);
|
||||
void revert_charges(sstables::shared_sstable sst);
|
||||
private:
|
||||
// Returns true if this SSTable can be added or removed from the tracker.
|
||||
bool sstable_belongs_to_tracker(const sstables::shared_sstable& sst);
|
||||
|
||||
void disable() {
|
||||
_disabled = true;
|
||||
_impl = {};
|
||||
_ongoing_writes = {};
|
||||
_ongoing_compactions = {};
|
||||
}
|
||||
bool _disabled = false;
|
||||
private:
|
||||
// Returns true if this SSTable can be added or removed from the tracker.
|
||||
bool sstable_belongs_to_tracker(const sstables::shared_sstable& sst);
|
||||
bool disabled() const noexcept { return !_impl; }
|
||||
std::unique_ptr<impl> _impl;
|
||||
// We keep track of this so that we can transfer to a new tracker if the compaction strategy is
|
||||
// changed in the middle of a compaction.
|
||||
|
||||
@@ -237,8 +237,7 @@ private:
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
return _added_backlog * _available_memory;
|
||||
}
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
|
||||
};
|
||||
|
||||
compaction_manager::compaction_state& compaction_manager::get_compaction_state(replica::table* t) {
|
||||
@@ -480,6 +479,8 @@ void compaction_manager::register_metrics() {
|
||||
sm::description("Holds the number of compaction tasks waiting for an opportunity to run.")),
|
||||
sm::make_gauge("backlog", [this] { return _last_backlog; },
|
||||
sm::description("Holds the sum of compaction backlog for all tables in the system.")),
|
||||
sm::make_gauge("normalized_backlog", [this] { return _last_backlog / _available_memory; },
|
||||
sm::description("Holds the sum of normalized compaction backlog for all tables in the system. Backlog is normalized by dividing backlog by shard's available memory.")),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1145,30 +1146,26 @@ compaction::strategy_control& compaction_manager::get_strategy_control() const n
|
||||
}
|
||||
|
||||
double compaction_backlog_tracker::backlog() const {
|
||||
return _disabled ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions);
|
||||
return disabled() ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions);
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
if (_disabled || !sstable_belongs_to_tracker(sst)) {
|
||||
void compaction_backlog_tracker::replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) {
|
||||
if (disabled()) {
|
||||
return;
|
||||
}
|
||||
_ongoing_writes.erase(sst);
|
||||
try {
|
||||
_impl->add_sstable(std::move(sst));
|
||||
} catch (...) {
|
||||
cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception());
|
||||
disable();
|
||||
}
|
||||
}
|
||||
auto filter_and_revert_charges = [this] (const std::vector<sstables::shared_sstable>& ssts) {
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
for (auto& sst : ssts) {
|
||||
if (sstable_belongs_to_tracker(sst)) {
|
||||
revert_charges(sst);
|
||||
ret.push_back(sst);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
|
||||
void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
if (_disabled || !sstable_belongs_to_tracker(sst)) {
|
||||
return;
|
||||
}
|
||||
|
||||
_ongoing_compactions.erase(sst);
|
||||
try {
|
||||
_impl->remove_sstable(std::move(sst));
|
||||
_impl->replace_sstables(filter_and_revert_charges(old_ssts), filter_and_revert_charges(new_ssts));
|
||||
} catch (...) {
|
||||
cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception());
|
||||
disable();
|
||||
@@ -1180,7 +1177,7 @@ bool compaction_backlog_tracker::sstable_belongs_to_tracker(const sstables::shar
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp) {
|
||||
if (_disabled) {
|
||||
if (disabled()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
@@ -1195,7 +1192,7 @@ void compaction_backlog_tracker::register_partially_written_sstable(sstables::sh
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp) {
|
||||
if (_disabled) {
|
||||
if (disabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -91,23 +91,16 @@ compaction_strategy_impl::compaction_strategy_impl(const std::map<sstring, sstri
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
size_tiered_backlog_tracker::inflight_component
|
||||
size_tiered_backlog_tracker::partial_backlog(const compaction_backlog_tracker::ongoing_writes& ongoing_writes) const {
|
||||
inflight_component in;
|
||||
for (auto const& swp : ongoing_writes) {
|
||||
auto written = swp.second->written();
|
||||
if (written > 0) {
|
||||
in.total_bytes += written;
|
||||
in.contribution += written * log4(written);
|
||||
}
|
||||
}
|
||||
return in;
|
||||
}
|
||||
|
||||
size_tiered_backlog_tracker::inflight_component
|
||||
size_tiered_backlog_tracker::compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const {
|
||||
inflight_component in;
|
||||
for (auto const& crp : ongoing_compactions) {
|
||||
// A SSTable being compacted may not contribute to backlog if compaction strategy decided
|
||||
// to perform a low-efficiency compaction when system is under little load, or when user
|
||||
// performs major even though strategy is completely satisfied
|
||||
if (!_sstables_contributing_backlog.contains(crp.first)) {
|
||||
continue;
|
||||
}
|
||||
auto compacted = crp.second->compacted();
|
||||
in.total_bytes += compacted;
|
||||
in.contribution += compacted * log4(crp.first->data_size());
|
||||
@@ -115,34 +108,75 @@ size_tiered_backlog_tracker::compacted_backlog(const compaction_backlog_tracker:
|
||||
return in;
|
||||
}
|
||||
|
||||
void size_tiered_backlog_tracker::refresh_sstables_backlog_contribution() {
|
||||
_sstables_backlog_contribution = 0.0f;
|
||||
_sstables_contributing_backlog = {};
|
||||
if (_all.empty()) {
|
||||
return;
|
||||
}
|
||||
using namespace sstables;
|
||||
|
||||
// Deduce threshold from the last SSTable added to the set
|
||||
// Low-efficiency jobs, which fan-in is smaller than min-threshold, will not have backlog accounted.
|
||||
// That's because they can only run when system is under little load, and accounting them would result
|
||||
// in efficient jobs acting more aggressive than they really have to.
|
||||
// TODO: potentially switch to compaction manager's fan-in threshold, so to account for the dynamic
|
||||
// fan-in threshold behavior.
|
||||
const auto& newest_sst = std::ranges::max(_all, {}, std::mem_fn(&sstable::generation));
|
||||
auto threshold = newest_sst->get_schema()->min_compaction_threshold();
|
||||
|
||||
for (auto& bucket : size_tiered_compaction_strategy::get_buckets(boost::copy_range<std::vector<shared_sstable>>(_all), _stcs_options)) {
|
||||
if (!size_tiered_compaction_strategy::is_bucket_interesting(bucket, threshold)) {
|
||||
continue;
|
||||
}
|
||||
_sstables_backlog_contribution += boost::accumulate(bucket | boost::adaptors::transformed([this] (const shared_sstable& sst) -> double {
|
||||
return sst->data_size() * log4(sst->data_size());
|
||||
}), double(0.0f));
|
||||
// Controller is disabled if exception is caught during add / remove calls, so not making any effort to make this exception safe
|
||||
_sstables_contributing_backlog.insert(bucket.begin(), bucket.end());
|
||||
}
|
||||
}
|
||||
|
||||
double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const {
|
||||
inflight_component partial = partial_backlog(ow);
|
||||
inflight_component compacted = compacted_backlog(oc);
|
||||
|
||||
auto effective_total_size = _total_bytes + partial.total_bytes - compacted.total_bytes;
|
||||
if ((effective_total_size <= 0)) {
|
||||
auto total_backlog_bytes = boost::accumulate(_sstables_contributing_backlog | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0));
|
||||
|
||||
// Bail out if effective backlog is zero, which happens in a small window where ongoing compaction exhausted
|
||||
// input files but is still sealing output files or doing managerial stuff like updating history table
|
||||
if (total_backlog_bytes <= compacted.total_bytes) {
|
||||
return 0;
|
||||
}
|
||||
if (_total_bytes == 0) {
|
||||
return 0;
|
||||
}
|
||||
auto sstables_contribution = _sstables_backlog_contribution + partial.contribution - compacted.contribution;
|
||||
auto b = (effective_total_size * log4(_total_bytes)) - sstables_contribution;
|
||||
|
||||
// Formula for each SSTable is (Si - Ci) * log(T / Si)
|
||||
// Which can be rewritten as: ((Si - Ci) * log(T)) - ((Si - Ci) * log(Si))
|
||||
//
|
||||
// For the meaning of each variable, please refer to the doc in size_tiered_backlog_tracker.hh
|
||||
|
||||
// Sum of (Si - Ci) for all SSTables contributing backlog
|
||||
auto effective_backlog_bytes = total_backlog_bytes - compacted.total_bytes;
|
||||
|
||||
// Sum of (Si - Ci) * log (Si) for all SSTables contributing backlog
|
||||
auto sstables_contribution = _sstables_backlog_contribution - compacted.contribution;
|
||||
// This is subtracting ((Si - Ci) * log (Si)) from ((Si - Ci) * log(T)), yielding the final backlog
|
||||
auto b = (effective_backlog_bytes * log4(_total_bytes)) - sstables_contribution;
|
||||
return b > 0 ? b : 0;
|
||||
}
|
||||
|
||||
void size_tiered_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes += sst->data_size();
|
||||
_sstables_backlog_contribution += sst->data_size() * log4(sst->data_size());
|
||||
void size_tiered_backlog_tracker::replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) {
|
||||
for (auto& sst : old_ssts) {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes -= sst->data_size();
|
||||
_all.erase(sst);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes -= sst->data_size();
|
||||
_sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size());
|
||||
for (auto& sst : new_ssts) {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes += sst->data_size();
|
||||
_all.insert(std::move(sst));
|
||||
}
|
||||
}
|
||||
refresh_sstables_backlog_contribution();
|
||||
}
|
||||
|
||||
namespace sstables {
|
||||
@@ -159,6 +193,7 @@ extern logging::logger clogger;
|
||||
// a new object for the partial write at this time.
|
||||
class time_window_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
time_window_compaction_strategy_options _twcs_options;
|
||||
size_tiered_compaction_strategy_options _stcs_options;
|
||||
std::unordered_map<api::timestamp_type, size_tiered_backlog_tracker> _windows;
|
||||
|
||||
api::timestamp_type lower_bound_of(api::timestamp_type timestamp) const {
|
||||
@@ -166,8 +201,9 @@ class time_window_backlog_tracker final : public compaction_backlog_tracker::imp
|
||||
return time_window_compaction_strategy::get_window_lower_bound(_twcs_options.sstable_window_size, ts);
|
||||
}
|
||||
public:
|
||||
time_window_backlog_tracker(time_window_compaction_strategy_options options)
|
||||
: _twcs_options(options)
|
||||
time_window_backlog_tracker(time_window_compaction_strategy_options twcs_options, size_tiered_compaction_strategy_options stcs_options)
|
||||
: _twcs_options(twcs_options)
|
||||
, _stcs_options(stcs_options)
|
||||
{}
|
||||
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
@@ -213,23 +249,39 @@ public:
|
||||
|
||||
// Partial writes that don't belong to any window are accounted here.
|
||||
for (auto& current : writes_per_window) {
|
||||
b += size_tiered_backlog_tracker().backlog(current.second, no_oc);
|
||||
b += size_tiered_backlog_tracker(_stcs_options).backlog(current.second, no_oc);
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override {
|
||||
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
|
||||
_windows[bound].add_sstable(sst);
|
||||
}
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {
|
||||
struct replacement {
|
||||
std::vector<sstables::shared_sstable> old_ssts;
|
||||
std::vector<sstables::shared_sstable> new_ssts;
|
||||
};
|
||||
std::unordered_map<api::timestamp_type, replacement> per_window_replacement;
|
||||
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override {
|
||||
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
|
||||
auto it = _windows.find(bound);
|
||||
if (it != _windows.end()) {
|
||||
it->second.remove_sstable(sst);
|
||||
if (it->second.total_bytes() <= 0) {
|
||||
_windows.erase(it);
|
||||
for (auto& sst : new_ssts) {
|
||||
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
|
||||
if (_windows.contains(bound)) {
|
||||
_windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options));
|
||||
}
|
||||
per_window_replacement[bound].new_ssts.push_back(std::move(sst));
|
||||
}
|
||||
for (auto& sst : old_ssts) {
|
||||
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
|
||||
if (_windows.contains(bound)) {
|
||||
per_window_replacement[bound].old_ssts.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& [bound, r] : per_window_replacement) {
|
||||
// All windows must exist here, as windows are created for new files and will
|
||||
// remain alive as long as there's a single file in them
|
||||
auto& w = _windows.at(bound);
|
||||
w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts));
|
||||
if (w.total_bytes() <= 0) {
|
||||
_windows.erase(bound);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -242,8 +294,9 @@ class leveled_compaction_backlog_tracker final : public compaction_backlog_track
|
||||
std::vector<uint64_t> _size_per_level;
|
||||
uint64_t _max_sstable_size;
|
||||
public:
|
||||
leveled_compaction_backlog_tracker(int32_t max_sstable_size_in_mb)
|
||||
: _size_per_level(leveled_manifest::MAX_LEVELS, uint64_t(0))
|
||||
leveled_compaction_backlog_tracker(int32_t max_sstable_size_in_mb, size_tiered_compaction_strategy_options stcs_options)
|
||||
: _l0_scts(stcs_options)
|
||||
, _size_per_level(leveled_manifest::MAX_LEVELS, uint64_t(0))
|
||||
, _max_sstable_size(max_sstable_size_in_mb * 1024 * 1024)
|
||||
{}
|
||||
|
||||
@@ -288,20 +341,23 @@ public:
|
||||
return b;
|
||||
}
|
||||
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override {
|
||||
auto level = sst->get_sstable_level();
|
||||
_size_per_level[level] += sst->data_size();
|
||||
if (level == 0) {
|
||||
_l0_scts.add_sstable(sst);
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {
|
||||
std::vector<sstables::shared_sstable> l0_old_ssts, l0_new_ssts;
|
||||
for (auto& sst : new_ssts) {
|
||||
auto level = sst->get_sstable_level();
|
||||
_size_per_level[level] += sst->data_size();
|
||||
if (level == 0) {
|
||||
l0_new_ssts.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override {
|
||||
auto level = sst->get_sstable_level();
|
||||
_size_per_level[level] -= sst->data_size();
|
||||
if (level == 0) {
|
||||
_l0_scts.remove_sstable(sst);
|
||||
for (auto& sst : old_ssts) {
|
||||
auto level = sst->get_sstable_level();
|
||||
_size_per_level[level] -= sst->data_size();
|
||||
if (level == 0) {
|
||||
l0_old_ssts.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
_l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -309,16 +365,14 @@ struct unimplemented_backlog_tracker final : public compaction_backlog_tracker::
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
return compaction_controller::disable_backlog;
|
||||
}
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
|
||||
};
|
||||
|
||||
struct null_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
return 0;
|
||||
}
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
|
||||
};
|
||||
|
||||
// Just so that if we have more than one CF with NullStrategy, we don't create a lot
|
||||
@@ -356,7 +410,7 @@ leveled_compaction_strategy::leveled_compaction_strategy(const std::map<sstring,
|
||||
: compaction_strategy_impl(options)
|
||||
, _max_sstable_size_in_mb(calculate_max_sstable_size_in_mb(compaction_strategy_impl::get_value(options, SSTABLE_SIZE_OPTION)))
|
||||
, _stcs_options(options)
|
||||
, _backlog_tracker(std::make_unique<leveled_compaction_backlog_tracker>(_max_sstable_size_in_mb))
|
||||
, _backlog_tracker(std::make_unique<leveled_compaction_backlog_tracker>(_max_sstable_size_in_mb, _stcs_options))
|
||||
{
|
||||
_compaction_counter.resize(leveled_manifest::MAX_LEVELS);
|
||||
}
|
||||
@@ -380,7 +434,7 @@ time_window_compaction_strategy::time_window_compaction_strategy(const std::map<
|
||||
: compaction_strategy_impl(options)
|
||||
, _options(options)
|
||||
, _stcs_options(options)
|
||||
, _backlog_tracker(std::make_unique<time_window_backlog_tracker>(_options))
|
||||
, _backlog_tracker(std::make_unique<time_window_backlog_tracker>(_options, _stcs_options))
|
||||
{
|
||||
if (!options.contains(TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.contains(TOMBSTONE_THRESHOLD_OPTION)) {
|
||||
_disable_tombstone_compaction = true;
|
||||
@@ -582,12 +636,12 @@ compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compacti
|
||||
size_tiered_compaction_strategy::size_tiered_compaction_strategy(const std::map<sstring, sstring>& options)
|
||||
: compaction_strategy_impl(options)
|
||||
, _options(options)
|
||||
, _backlog_tracker(std::make_unique<size_tiered_backlog_tracker>())
|
||||
, _backlog_tracker(std::make_unique<size_tiered_backlog_tracker>(_options))
|
||||
{}
|
||||
|
||||
size_tiered_compaction_strategy::size_tiered_compaction_strategy(const size_tiered_compaction_strategy_options& options)
|
||||
: _options(options)
|
||||
, _backlog_tracker(std::make_unique<size_tiered_backlog_tracker>())
|
||||
, _backlog_tracker(std::make_unique<size_tiered_backlog_tracker>(_options))
|
||||
{}
|
||||
|
||||
compaction_strategy::compaction_strategy(::shared_ptr<compaction_strategy_impl> impl)
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
|
||||
#pragma once
|
||||
#include "compaction_backlog_manager.hh"
|
||||
#include "size_tiered_compaction_strategy.hh"
|
||||
#include <cmath>
|
||||
#include <ctgmath>
|
||||
|
||||
@@ -63,30 +64,33 @@
|
||||
// certain point in time, whose size is the amount of bytes currently written. So all we need
|
||||
// to do is keep track of them too, and add the current estimate to the static part of (4).
|
||||
class size_tiered_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
sstables::size_tiered_compaction_strategy_options _stcs_options;
|
||||
int64_t _total_bytes = 0;
|
||||
double _sstables_backlog_contribution = 0.0f;
|
||||
std::unordered_set<sstables::shared_sstable> _sstables_contributing_backlog;
|
||||
std::unordered_set<sstables::shared_sstable> _all;
|
||||
|
||||
struct inflight_component {
|
||||
int64_t total_bytes = 0;
|
||||
uint64_t total_bytes = 0;
|
||||
double contribution = 0;
|
||||
};
|
||||
|
||||
inflight_component partial_backlog(const compaction_backlog_tracker::ongoing_writes& ongoing_writes) const;
|
||||
|
||||
inflight_component compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const;
|
||||
|
||||
double log4(double x) const {
|
||||
double inv_log_4 = 1.0f / std::log(4);
|
||||
return log(x) * inv_log_4;
|
||||
}
|
||||
public:
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override;
|
||||
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override;
|
||||
void refresh_sstables_backlog_contribution();
|
||||
public:
|
||||
size_tiered_backlog_tracker(sstables::size_tiered_compaction_strategy_options stcs_options) : _stcs_options(stcs_options) {}
|
||||
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override;
|
||||
|
||||
// Removing could be the result of a failure of an in progress write, successful finish of a
|
||||
// compaction, or some one-off operation, like drop
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override;
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override;
|
||||
|
||||
int64_t total_bytes() const {
|
||||
return _total_bytes;
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
|
||||
class size_tiered_backlog_tracker;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class size_tiered_compaction_strategy_options {
|
||||
@@ -106,7 +108,7 @@ class size_tiered_compaction_strategy : public compaction_strategy_impl {
|
||||
return n / sstables.size();
|
||||
}
|
||||
|
||||
bool is_bucket_interesting(const std::vector<sstables::shared_sstable>& bucket, int min_threshold) const {
|
||||
static bool is_bucket_interesting(const std::vector<sstables::shared_sstable>& bucket, int min_threshold) {
|
||||
return bucket.size() >= size_t(min_threshold);
|
||||
}
|
||||
|
||||
@@ -142,6 +144,7 @@ public:
|
||||
|
||||
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override;
|
||||
|
||||
friend class ::size_tiered_backlog_tracker;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -368,21 +368,16 @@ void table::update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable) no
|
||||
}
|
||||
|
||||
inline void table::add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
|
||||
tracker.add_sstable(std::move(sstable));
|
||||
tracker.replace_sstables({}, {std::move(sstable)});
|
||||
}
|
||||
|
||||
inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
|
||||
tracker.remove_sstable(std::move(sstable));
|
||||
tracker.replace_sstables({std::move(sstable)}, {});
|
||||
}
|
||||
|
||||
void table::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
|
||||
auto& tracker = _compaction_strategy.get_backlog_tracker();
|
||||
for (auto& sst : new_sstables) {
|
||||
tracker.add_sstable(sst);
|
||||
}
|
||||
for (auto& sst : old_sstables) {
|
||||
tracker.remove_sstable(sst);
|
||||
}
|
||||
tracker.replace_sstables(old_sstables, new_sstables);
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set>
|
||||
@@ -755,6 +750,7 @@ table::stop() {
|
||||
_sstables = make_compound_sstable_set();
|
||||
_sstables_staging.clear();
|
||||
})).then([this] {
|
||||
_compaction_strategy.get_backlog_tracker().disable();
|
||||
_cache.refresh_snapshot();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3209,7 +3209,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy
|
||||
};
|
||||
|
||||
for (auto& sst : ssts) {
|
||||
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
|
||||
cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst});
|
||||
}
|
||||
|
||||
// Start compaction, then stop tracking compaction, switch to TWCS, wait for compaction to finish and check for backlog.
|
||||
@@ -3220,7 +3220,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy
|
||||
// set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker.
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
for (auto& sst : ssts) {
|
||||
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
|
||||
cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst});
|
||||
}
|
||||
|
||||
auto ret = fut.get0();
|
||||
@@ -4811,3 +4811,110 @@ SEASTAR_TEST_CASE(test_major_does_not_miss_data_in_memtable) {
|
||||
.produces_end_of_stream();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(simple_backlog_controller_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
/////////////
|
||||
// settings
|
||||
static constexpr float disk_memory_ratio = 78.125; /* AWS I3en is ~78.125 */
|
||||
static constexpr uint64_t available_memory_per_shard = 8'000'000'000; /* AWS I3en */
|
||||
static constexpr float target_disk_usage = 0.50;
|
||||
|
||||
const uint64_t available_disk_size_per_shard = disk_memory_ratio * available_memory_per_shard;
|
||||
const uint64_t available_memory = available_memory_per_shard * 0.92; /* 8% is reserved for the OS */
|
||||
const uint64_t estimated_flush_size = double(available_memory) * 0.05; /* flush threshold is 5% of available shard mem */
|
||||
const uint64_t all_tables_disk_usage = double(available_disk_size_per_shard) * target_disk_usage;
|
||||
|
||||
auto as = abort_source();
|
||||
compaction_manager::compaction_scheduling_group csg = { default_scheduling_group(), default_priority_class() };
|
||||
compaction_manager::maintenance_scheduling_group msg = { default_scheduling_group(), default_priority_class() };
|
||||
auto manager = compaction_manager(csg, msg, available_memory, as);
|
||||
|
||||
auto add_sstable = [&env, &manager, gen = make_lw_shared<unsigned>(1)] (replica::table& t, uint64_t data_size) {
|
||||
auto sst = env.make_sstable(t.schema(), "", (*gen)++, la, big);
|
||||
auto key = make_local_key(t.schema());
|
||||
sstables::test(sst).set_values_for_leveled_strategy(data_size, 0 /*level*/, 0 /*max ts*/, key, key);
|
||||
assert(sst->data_size() == data_size);
|
||||
auto backlog_before = t.get_compaction_strategy().get_backlog_tracker().backlog();
|
||||
t.add_sstable_and_update_cache(sst).get();
|
||||
testlog.debug("\tNew sstable of size={}; Backlog diff={};",
|
||||
sstables::pretty_printed_data_size(data_size),
|
||||
t.get_compaction_strategy().get_backlog_tracker().backlog() - backlog_before);
|
||||
};
|
||||
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
auto create_table = [&] () {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore());
|
||||
cfg.datadir = "";
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_cache = false;
|
||||
auto t = make_lw_shared<replica::table>(s, cfg, replica::table::no_commitlog(), manager, cl_stats, *tracker);
|
||||
t->mark_ready_for_writes();
|
||||
t->start();
|
||||
t->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
return t;
|
||||
};
|
||||
|
||||
auto get_size_for_tier = [&] (int tier) -> uint64_t {
|
||||
return std::pow(4, tier) * estimated_flush_size;
|
||||
};
|
||||
auto get_total_tiers = [&] (uint64_t target_size) -> unsigned {
|
||||
double inv_log_4 = 1.0f / std::log(4);
|
||||
return std::ceil(std::log(double(target_size) / estimated_flush_size) * inv_log_4);
|
||||
};
|
||||
auto normalize_backlog = [&] (double backlog) -> double {
|
||||
return backlog / available_memory;
|
||||
};
|
||||
|
||||
struct result {
|
||||
unsigned table_count;
|
||||
uint64_t per_table_max_disk_usage;
|
||||
double normalized_backlog;
|
||||
};
|
||||
std::vector<result> results;
|
||||
|
||||
std::vector<unsigned> target_table_count_s = { 1, 2, 5, 10, 20 };
|
||||
for (auto target_table_count : target_table_count_s) {
|
||||
const uint64_t per_table_max_disk_usage = std::ceil(all_tables_disk_usage / target_table_count);
|
||||
|
||||
testlog.info("Creating tables, with max size={}", sstables::pretty_printed_data_size(per_table_max_disk_usage));
|
||||
|
||||
std::vector<lw_shared_ptr<replica::table>> tables;
|
||||
uint64_t tables_total_size = 0;
|
||||
|
||||
for (uint64_t t_idx = 0, available_space = all_tables_disk_usage; available_space >= estimated_flush_size; t_idx++) {
|
||||
auto target_disk_usage = std::min(available_space, per_table_max_disk_usage);
|
||||
auto tiers = get_total_tiers(target_disk_usage);
|
||||
|
||||
auto t = create_table();
|
||||
for (auto tier_idx = 0; tier_idx < tiers; tier_idx++) {
|
||||
auto tier_size = get_size_for_tier(tier_idx);
|
||||
if (tier_size > available_space) {
|
||||
break;
|
||||
}
|
||||
add_sstable(*t, tier_size);
|
||||
available_space -= std::min(available_space, uint64_t(tier_size));
|
||||
}
|
||||
|
||||
auto table_size = t->get_stats().live_disk_space_used;
|
||||
testlog.debug("T{}: {} tiers, with total size={}", t_idx, tiers, sstables::pretty_printed_data_size(table_size));
|
||||
tables.push_back(t);
|
||||
tables_total_size += table_size;
|
||||
}
|
||||
testlog.debug("Created {} tables, with total size={}", tables.size(), sstables::pretty_printed_data_size(tables_total_size));
|
||||
results.push_back(result{ tables.size(), per_table_max_disk_usage, normalize_backlog(manager.backlog()) });
|
||||
for (auto& t : tables) {
|
||||
t->stop().get();
|
||||
}
|
||||
}
|
||||
for (auto& r : results) {
|
||||
testlog.info("Tables={} with max size={} -> NormalizedBacklog={}", r.table_count, sstables::pretty_printed_data_size(r.per_table_max_disk_usage), r.normalized_backlog);
|
||||
// Expect 0 backlog as tiers are all perfectly compacted
|
||||
BOOST_REQUIRE(r.normalized_backlog == 0.0f);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -198,6 +198,7 @@ public:
|
||||
// Used to create synthetic sstables for testing leveled compaction strategy.
|
||||
void set_values_for_leveled_strategy(uint64_t fake_data_size, uint32_t sstable_level, int64_t max_timestamp, sstring first_key, sstring last_key) {
|
||||
_sst->_data_file_size = fake_data_size;
|
||||
_sst->_bytes_on_disk = fake_data_size;
|
||||
// Create a synthetic stats metadata
|
||||
stats_metadata stats = {};
|
||||
// leveled strategy sorts sstables by age using max_timestamp, let's set it to 0.
|
||||
|
||||
Reference in New Issue
Block a user