compaction_backlog_tracker: Batch changes through a new replacement interface

This new interface allows table to communicate multiple changes in the
SSTable set with a single call, which is useful on compaction completion
for example.
With this new interface, the size tiered backlog tracker will be able to
know when compaction completed, which will allow it to recompute tiers
and their backlog contribution, if any. Without it, tiered tracker
would have to recompute tiers for every change, which would be terribly
expensive.
The old remove/add interface are being removed in favor of the new one.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2022-02-21 16:51:40 -03:00
parent 84d843697b
commit ddd693c6d7
6 changed files with 72 additions and 73 deletions

View File

@@ -60,8 +60,7 @@ public:
using ongoing_compactions = std::unordered_map<sstables::shared_sstable, backlog_read_progress_manager*>;
struct impl {
virtual void add_sstable(sstables::shared_sstable sst) = 0;
virtual void remove_sstable(sstables::shared_sstable sst) = 0;
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) = 0;
virtual double backlog(const ongoing_writes& ow, const ongoing_compactions& oc) const = 0;
virtual ~impl() { }
};
@@ -72,8 +71,7 @@ public:
~compaction_backlog_tracker();
double backlog() const;
void add_sstable(sstables::shared_sstable sst);
void remove_sstable(sstables::shared_sstable sst);
void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts);
void register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp);
void register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp);
void transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges = true);

View File

@@ -237,8 +237,7 @@ private:
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
return _added_backlog * _available_memory;
}
virtual void add_sstable(sstables::shared_sstable sst) override { }
virtual void remove_sstable(sstables::shared_sstable sst) override { }
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
};
compaction_manager::compaction_state& compaction_manager::get_compaction_state(replica::table* t) {
@@ -1150,27 +1149,23 @@ double compaction_backlog_tracker::backlog() const {
return disabled() ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions);
}
void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
if (disabled() || !sstable_belongs_to_tracker(sst)) {
void compaction_backlog_tracker::replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) {
if (disabled()) {
return;
}
_ongoing_writes.erase(sst);
try {
_impl->add_sstable(std::move(sst));
} catch (...) {
cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception());
disable();
}
}
auto filter_and_revert_charges = [this] (const std::vector<sstables::shared_sstable>& ssts) {
std::vector<sstables::shared_sstable> ret;
for (auto& sst : ssts) {
if (sstable_belongs_to_tracker(sst)) {
revert_charges(sst);
ret.push_back(sst);
}
}
return ret;
};
void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
if (disabled() || !sstable_belongs_to_tracker(sst)) {
return;
}
_ongoing_compactions.erase(sst);
try {
_impl->remove_sstable(std::move(sst));
_impl->replace_sstables(filter_and_revert_charges(old_ssts), filter_and_revert_charges(new_ssts));
} catch (...) {
cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception());
disable();

View File

@@ -131,17 +131,18 @@ double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::on
return b > 0 ? b : 0;
}
void size_tiered_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
if (sst->data_size() > 0) {
_total_bytes += sst->data_size();
_sstables_backlog_contribution += sst->data_size() * log4(sst->data_size());
void size_tiered_backlog_tracker::replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) {
for (auto& sst : new_ssts) {
if (sst->data_size() > 0) {
_total_bytes += sst->data_size();
_sstables_backlog_contribution += sst->data_size() * log4(sst->data_size());
}
}
}
void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
if (sst->data_size() > 0) {
_total_bytes -= sst->data_size();
_sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size());
for (auto& sst : old_ssts) {
if (sst->data_size() > 0) {
_total_bytes -= sst->data_size();
_sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size());
}
}
}
@@ -218,18 +219,29 @@ public:
return b;
}
virtual void add_sstable(sstables::shared_sstable sst) override {
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
_windows[bound].add_sstable(sst);
}
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {
struct replacement {
std::vector<sstables::shared_sstable> old_ssts;
std::vector<sstables::shared_sstable> new_ssts;
};
std::unordered_map<api::timestamp_type, replacement> per_window_replacement;
virtual void remove_sstable(sstables::shared_sstable sst) override {
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
auto it = _windows.find(bound);
if (it != _windows.end()) {
it->second.remove_sstable(sst);
if (it->second.total_bytes() <= 0) {
_windows.erase(it);
for (auto& sst : new_ssts) {
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
per_window_replacement[bound].new_ssts.push_back(std::move(sst));
}
for (auto& sst : old_ssts) {
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
if (_windows.contains(bound)) {
per_window_replacement[bound].old_ssts.push_back(std::move(sst));
}
}
for (auto& [bound, r] : per_window_replacement) {
auto& w = _windows[bound];
w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts));
if (w.total_bytes() <= 0) {
_windows.erase(bound);
}
}
}
@@ -288,20 +300,23 @@ public:
return b;
}
virtual void add_sstable(sstables::shared_sstable sst) override {
auto level = sst->get_sstable_level();
_size_per_level[level] += sst->data_size();
if (level == 0) {
_l0_scts.add_sstable(sst);
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {
std::vector<sstables::shared_sstable> l0_old_ssts, l0_new_ssts;
for (auto& sst : new_ssts) {
auto level = sst->get_sstable_level();
_size_per_level[level] += sst->data_size();
if (level == 0) {
l0_new_ssts.push_back(std::move(sst));
}
}
}
virtual void remove_sstable(sstables::shared_sstable sst) override {
auto level = sst->get_sstable_level();
_size_per_level[level] -= sst->data_size();
if (level == 0) {
_l0_scts.remove_sstable(sst);
for (auto& sst : old_ssts) {
auto level = sst->get_sstable_level();
_size_per_level[level] -= sst->data_size();
if (level == 0) {
l0_old_ssts.push_back(std::move(sst));
}
}
_l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts));
}
};
@@ -309,16 +324,14 @@ struct unimplemented_backlog_tracker final : public compaction_backlog_tracker::
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
return compaction_controller::disable_backlog;
}
virtual void add_sstable(sstables::shared_sstable sst) override { }
virtual void remove_sstable(sstables::shared_sstable sst) override { }
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
};
struct null_backlog_tracker final : public compaction_backlog_tracker::impl {
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
return 0;
}
virtual void add_sstable(sstables::shared_sstable sst) override { }
virtual void remove_sstable(sstables::shared_sstable sst) override { }
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
};
// Just so that if we have more than one CF with NullStrategy, we don't create a lot

View File

@@ -82,11 +82,9 @@ class size_tiered_backlog_tracker final : public compaction_backlog_tracker::imp
public:
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override;
virtual void add_sstable(sstables::shared_sstable sst) override;
// Removing could be the result of a failure of an in progress write, successful finish of a
// compaction, or some one-off operation, like drop
virtual void remove_sstable(sstables::shared_sstable sst) override;
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override;
int64_t total_bytes() const {
return _total_bytes;

View File

@@ -368,21 +368,16 @@ void table::update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable) no
}
inline void table::add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
tracker.add_sstable(std::move(sstable));
tracker.replace_sstables({}, {std::move(sstable)});
}
inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
tracker.remove_sstable(std::move(sstable));
tracker.replace_sstables({std::move(sstable)}, {});
}
void table::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
auto& tracker = _compaction_strategy.get_backlog_tracker();
for (auto& sst : new_sstables) {
tracker.add_sstable(sst);
}
for (auto& sst : old_sstables) {
tracker.remove_sstable(sst);
}
tracker.replace_sstables(old_sstables, new_sstables);
}
lw_shared_ptr<sstables::sstable_set>

View File

@@ -3209,7 +3209,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy
};
for (auto& sst : ssts) {
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst});
}
// Start compaction, then stop tracking compaction, switch to TWCS, wait for compaction to finish and check for backlog.
@@ -3220,7 +3220,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy
// set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker.
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
for (auto& sst : ssts) {
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst});
}
auto ret = fut.get0();