compaction_backlog_tracker: Batch changes through a new replacement interface
This new interface allows table to communicate multiple changes in the SSTable set with a single call, which is useful on compaction completion for example. With this new interface, the size tiered backlog tracker will be able to know when compaction completed, which will allow it to recompute tiers and their backlog contribution, if any. Without it, tiered tracker would have to recompute tiers for every change, which would be terribly expensive. The old remove/add interface are being removed in favor of the new one. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -60,8 +60,7 @@ public:
|
||||
using ongoing_compactions = std::unordered_map<sstables::shared_sstable, backlog_read_progress_manager*>;
|
||||
|
||||
struct impl {
|
||||
virtual void add_sstable(sstables::shared_sstable sst) = 0;
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) = 0;
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) = 0;
|
||||
virtual double backlog(const ongoing_writes& ow, const ongoing_compactions& oc) const = 0;
|
||||
virtual ~impl() { }
|
||||
};
|
||||
@@ -72,8 +71,7 @@ public:
|
||||
~compaction_backlog_tracker();
|
||||
|
||||
double backlog() const;
|
||||
void add_sstable(sstables::shared_sstable sst);
|
||||
void remove_sstable(sstables::shared_sstable sst);
|
||||
void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts);
|
||||
void register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp);
|
||||
void register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp);
|
||||
void transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges = true);
|
||||
|
||||
@@ -237,8 +237,7 @@ private:
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
return _added_backlog * _available_memory;
|
||||
}
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
|
||||
};
|
||||
|
||||
compaction_manager::compaction_state& compaction_manager::get_compaction_state(replica::table* t) {
|
||||
@@ -1150,27 +1149,23 @@ double compaction_backlog_tracker::backlog() const {
|
||||
return disabled() ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions);
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
if (disabled() || !sstable_belongs_to_tracker(sst)) {
|
||||
void compaction_backlog_tracker::replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) {
|
||||
if (disabled()) {
|
||||
return;
|
||||
}
|
||||
_ongoing_writes.erase(sst);
|
||||
try {
|
||||
_impl->add_sstable(std::move(sst));
|
||||
} catch (...) {
|
||||
cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception());
|
||||
disable();
|
||||
}
|
||||
}
|
||||
auto filter_and_revert_charges = [this] (const std::vector<sstables::shared_sstable>& ssts) {
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
for (auto& sst : ssts) {
|
||||
if (sstable_belongs_to_tracker(sst)) {
|
||||
revert_charges(sst);
|
||||
ret.push_back(sst);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
|
||||
void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
if (disabled() || !sstable_belongs_to_tracker(sst)) {
|
||||
return;
|
||||
}
|
||||
|
||||
_ongoing_compactions.erase(sst);
|
||||
try {
|
||||
_impl->remove_sstable(std::move(sst));
|
||||
_impl->replace_sstables(filter_and_revert_charges(old_ssts), filter_and_revert_charges(new_ssts));
|
||||
} catch (...) {
|
||||
cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception());
|
||||
disable();
|
||||
|
||||
@@ -131,17 +131,18 @@ double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::on
|
||||
return b > 0 ? b : 0;
|
||||
}
|
||||
|
||||
void size_tiered_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes += sst->data_size();
|
||||
_sstables_backlog_contribution += sst->data_size() * log4(sst->data_size());
|
||||
void size_tiered_backlog_tracker::replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) {
|
||||
for (auto& sst : new_ssts) {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes += sst->data_size();
|
||||
_sstables_backlog_contribution += sst->data_size() * log4(sst->data_size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes -= sst->data_size();
|
||||
_sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size());
|
||||
for (auto& sst : old_ssts) {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes -= sst->data_size();
|
||||
_sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,18 +219,29 @@ public:
|
||||
return b;
|
||||
}
|
||||
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override {
|
||||
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
|
||||
_windows[bound].add_sstable(sst);
|
||||
}
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {
|
||||
struct replacement {
|
||||
std::vector<sstables::shared_sstable> old_ssts;
|
||||
std::vector<sstables::shared_sstable> new_ssts;
|
||||
};
|
||||
std::unordered_map<api::timestamp_type, replacement> per_window_replacement;
|
||||
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override {
|
||||
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
|
||||
auto it = _windows.find(bound);
|
||||
if (it != _windows.end()) {
|
||||
it->second.remove_sstable(sst);
|
||||
if (it->second.total_bytes() <= 0) {
|
||||
_windows.erase(it);
|
||||
for (auto& sst : new_ssts) {
|
||||
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
|
||||
per_window_replacement[bound].new_ssts.push_back(std::move(sst));
|
||||
}
|
||||
for (auto& sst : old_ssts) {
|
||||
auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp);
|
||||
if (_windows.contains(bound)) {
|
||||
per_window_replacement[bound].old_ssts.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& [bound, r] : per_window_replacement) {
|
||||
auto& w = _windows[bound];
|
||||
w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts));
|
||||
if (w.total_bytes() <= 0) {
|
||||
_windows.erase(bound);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -288,20 +300,23 @@ public:
|
||||
return b;
|
||||
}
|
||||
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override {
|
||||
auto level = sst->get_sstable_level();
|
||||
_size_per_level[level] += sst->data_size();
|
||||
if (level == 0) {
|
||||
_l0_scts.add_sstable(sst);
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {
|
||||
std::vector<sstables::shared_sstable> l0_old_ssts, l0_new_ssts;
|
||||
for (auto& sst : new_ssts) {
|
||||
auto level = sst->get_sstable_level();
|
||||
_size_per_level[level] += sst->data_size();
|
||||
if (level == 0) {
|
||||
l0_new_ssts.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override {
|
||||
auto level = sst->get_sstable_level();
|
||||
_size_per_level[level] -= sst->data_size();
|
||||
if (level == 0) {
|
||||
_l0_scts.remove_sstable(sst);
|
||||
for (auto& sst : old_ssts) {
|
||||
auto level = sst->get_sstable_level();
|
||||
_size_per_level[level] -= sst->data_size();
|
||||
if (level == 0) {
|
||||
l0_old_ssts.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
_l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -309,16 +324,14 @@ struct unimplemented_backlog_tracker final : public compaction_backlog_tracker::
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
return compaction_controller::disable_backlog;
|
||||
}
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
|
||||
};
|
||||
|
||||
struct null_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
return 0;
|
||||
}
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
|
||||
};
|
||||
|
||||
// Just so that if we have more than one CF with NullStrategy, we don't create a lot
|
||||
|
||||
@@ -82,11 +82,9 @@ class size_tiered_backlog_tracker final : public compaction_backlog_tracker::imp
|
||||
public:
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override;
|
||||
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override;
|
||||
|
||||
// Removing could be the result of a failure of an in progress write, successful finish of a
|
||||
// compaction, or some one-off operation, like drop
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override;
|
||||
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override;
|
||||
|
||||
int64_t total_bytes() const {
|
||||
return _total_bytes;
|
||||
|
||||
@@ -368,21 +368,16 @@ void table::update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable) no
|
||||
}
|
||||
|
||||
inline void table::add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
|
||||
tracker.add_sstable(std::move(sstable));
|
||||
tracker.replace_sstables({}, {std::move(sstable)});
|
||||
}
|
||||
|
||||
inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
|
||||
tracker.remove_sstable(std::move(sstable));
|
||||
tracker.replace_sstables({std::move(sstable)}, {});
|
||||
}
|
||||
|
||||
void table::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
|
||||
auto& tracker = _compaction_strategy.get_backlog_tracker();
|
||||
for (auto& sst : new_sstables) {
|
||||
tracker.add_sstable(sst);
|
||||
}
|
||||
for (auto& sst : old_sstables) {
|
||||
tracker.remove_sstable(sst);
|
||||
}
|
||||
tracker.replace_sstables(old_sstables, new_sstables);
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set>
|
||||
|
||||
@@ -3209,7 +3209,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy
|
||||
};
|
||||
|
||||
for (auto& sst : ssts) {
|
||||
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
|
||||
cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst});
|
||||
}
|
||||
|
||||
// Start compaction, then stop tracking compaction, switch to TWCS, wait for compaction to finish and check for backlog.
|
||||
@@ -3220,7 +3220,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy
|
||||
// set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker.
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
for (auto& sst : ssts) {
|
||||
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
|
||||
cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst});
|
||||
}
|
||||
|
||||
auto ret = fut.get0();
|
||||
|
||||
Reference in New Issue
Block a user