compaction_manager: serialize compaction of same size tier for different cfs
Currently, compaction manager will serialize compaction of same size tier (or weight) if they belong to the same column family. However, it fails to do so if the compaction jobs belong to different column families. That can lead to an ungodly amount of running compaction which gets worse the higher the number of shards and active column families. The problem is that it may affect overall system performance due to excessive resource usage. It's easy to trigger it during bootstraping after loading node with new sstables or repairing, or if lots of cfs are being actively written. That being said, compaction jobs of same size tier are now serialized on a given shard, such that maximum number of compaction (system wise) is now: (SHARDS) * (SIZE TIERS) instead of: (SHARDS) * (COLUMN FAMILIES) * (SIZE TIERS) We'll work hard to release a size tier (weight) for a column family waiting on it as fast as possible, given that we wouldn't like to underutilize resources available for compaction. We want one starting after the other. Compaction for a column family that cannot run now because the size tier is taken, will be postponed. There's a worker that will be sleeping on a condition variable that will be signalled whenever a compaction completes. FIFO ordering is used on postponed list for fairness. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -1324,7 +1324,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor), cleanup] {
|
||||
return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor), cleanup] () mutable {
|
||||
auto create_sstable = [this] {
|
||||
auto gen = this->calculate_generation_for_new_table();
|
||||
auto sst = sstables::make_sstable(_schema, _config.datadir, gen,
|
||||
|
||||
@@ -356,6 +356,7 @@ class regular_compaction : public compaction {
|
||||
// sstable being currently written.
|
||||
shared_sstable _sst;
|
||||
stdx::optional<sstable_writer> _writer;
|
||||
stdx::optional<compaction_weight_registration> _weight_registration;
|
||||
public:
|
||||
regular_compaction(column_family& cf, compaction_descriptor descriptor, std::function<shared_sstable()> creator,
|
||||
seastar::thread_scheduling_group* tsg)
|
||||
@@ -363,6 +364,7 @@ public:
|
||||
, _creator(std::move(creator))
|
||||
, _set(cf.get_sstable_set())
|
||||
, _selector(_set.make_incremental_selector())
|
||||
, _weight_registration(std::move(descriptor.weight_registration))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -405,10 +407,17 @@ public:
|
||||
}
|
||||
|
||||
virtual void finish_sstable_writer() override {
|
||||
on_end_of_stream();
|
||||
if (_writer) {
|
||||
stop_sstable_writer();
|
||||
}
|
||||
}
|
||||
private:
|
||||
void on_end_of_stream() {
|
||||
if (_weight_registration) {
|
||||
_cf.get_compaction_manager().on_compaction_complete(*_weight_registration);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class cleanup_compaction final : public regular_compaction {
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "database_fwd.hh"
|
||||
#include "shared_sstable.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "compaction_weight_registration.hh"
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <functional>
|
||||
|
||||
@@ -37,6 +38,8 @@ namespace sstables {
|
||||
int level;
|
||||
// Threshold size for sstable(s) to be created.
|
||||
uint64_t max_sstable_bytes;
|
||||
// Holds ownership of a weight assigned to this compaction iff it's a regular one.
|
||||
stdx::optional<compaction_weight_registration> weight_registration;
|
||||
|
||||
compaction_descriptor() = default;
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include "exceptions.hh"
|
||||
#include <cmath>
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
|
||||
static logging::logger cmlog("compaction_manager");
|
||||
|
||||
@@ -69,7 +70,7 @@ compaction_weight_registration::compaction_weight_registration(compaction_manage
|
||||
, _cf(cf)
|
||||
, _weight(weight)
|
||||
{
|
||||
_cm->register_weight(_cf, _weight);
|
||||
_cm->register_weight(_weight);
|
||||
}
|
||||
|
||||
compaction_weight_registration& compaction_weight_registration::operator=(compaction_weight_registration&& other) noexcept {
|
||||
@@ -92,12 +93,12 @@ compaction_weight_registration::compaction_weight_registration(compaction_weight
|
||||
|
||||
compaction_weight_registration::~compaction_weight_registration() {
|
||||
if (_cm) {
|
||||
_cm->deregister_weight(_cf, _weight);
|
||||
_cm->deregister_weight(_weight);
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_weight_registration::deregister() {
|
||||
_cm->deregister_weight(_cf, _weight);
|
||||
_cm->deregister_weight(_weight);
|
||||
_cm = nullptr;
|
||||
}
|
||||
|
||||
@@ -140,17 +141,12 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
|
||||
if (descriptor.level != 0 || descriptor.sstables.empty()) {
|
||||
return weight;
|
||||
}
|
||||
auto it = _weight_tracker.find(cf);
|
||||
if (it == _weight_tracker.end()) {
|
||||
return weight;
|
||||
}
|
||||
|
||||
std::unordered_set<int>& s = it->second;
|
||||
uint64_t total_size = get_total_size(descriptor.sstables);
|
||||
int min_threshold = cf->schema()->min_compaction_threshold();
|
||||
|
||||
while (descriptor.sstables.size() > size_t(min_threshold)) {
|
||||
if (s.count(weight)) {
|
||||
if (_weight_tracker.count(weight)) {
|
||||
total_size -= descriptor.sstables.back()->data_size();
|
||||
descriptor.sstables.pop_back();
|
||||
weight = calculate_weight(total_size);
|
||||
@@ -161,20 +157,25 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
|
||||
return weight;
|
||||
}
|
||||
|
||||
bool compaction_manager::can_register_weight(column_family* cf, int weight, bool parallel_compaction) {
|
||||
auto it = _weight_tracker.find(cf);
|
||||
if (it == _weight_tracker.end()) {
|
||||
bool compaction_manager::can_register_weight(column_family* cf, int weight) {
|
||||
if (_weight_tracker.empty()) {
|
||||
return true;
|
||||
}
|
||||
std::unordered_set<int>& s = it->second;
|
||||
|
||||
auto has_cf_ongoing_compaction = [&] {
|
||||
return boost::algorithm::any_of(_tasks, [&] (const lw_shared_ptr<task>& task) {
|
||||
return task->compacting_cf == cf;
|
||||
});
|
||||
};
|
||||
|
||||
// Only one weight is allowed if parallel compaction is disabled.
|
||||
if (!parallel_compaction && !s.empty()) {
|
||||
if (!cf->get_compaction_strategy().parallel_compaction() && has_cf_ongoing_compaction()) {
|
||||
return false;
|
||||
}
|
||||
// TODO: Maybe allow only *smaller* compactions to start? That can be done
|
||||
// by returning true only if weight is not in the set and is lower than any
|
||||
// entry in the set.
|
||||
if (s.count(weight)) {
|
||||
if (_weight_tracker.count(weight)) {
|
||||
// If reached this point, it means that there is an ongoing compaction
|
||||
// with the weight of the compaction job.
|
||||
return false;
|
||||
@@ -182,19 +183,13 @@ bool compaction_manager::can_register_weight(column_family* cf, int weight, bool
|
||||
return true;
|
||||
}
|
||||
|
||||
void compaction_manager::register_weight(column_family* cf, int weight) {
|
||||
auto it = _weight_tracker.find(cf);
|
||||
if (it == _weight_tracker.end()) {
|
||||
_weight_tracker.insert({cf, {weight}});
|
||||
} else {
|
||||
it->second.insert(weight);
|
||||
}
|
||||
void compaction_manager::register_weight(int weight) {
|
||||
_weight_tracker.insert(weight);
|
||||
}
|
||||
|
||||
void compaction_manager::deregister_weight(column_family* cf, int weight) {
|
||||
auto it = _weight_tracker.find(cf);
|
||||
assert(it != _weight_tracker.end());
|
||||
it->second.erase(weight);
|
||||
void compaction_manager::deregister_weight(int weight) {
|
||||
_weight_tracker.erase(weight);
|
||||
reevalute_postponed_compactions();
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const column_family& cf) {
|
||||
@@ -381,6 +376,7 @@ void compaction_manager::start() {
|
||||
_stopped = false;
|
||||
register_metrics();
|
||||
_compaction_submission_timer.arm(periodic_compaction_submission_interval());
|
||||
postponed_compactions_reevaluation();
|
||||
}
|
||||
|
||||
std::function<void()> compaction_manager::compaction_submission_callback() {
|
||||
@@ -391,6 +387,34 @@ std::function<void()> compaction_manager::compaction_submission_callback() {
|
||||
};
|
||||
}
|
||||
|
||||
void compaction_manager::postponed_compactions_reevaluation() {
|
||||
_waiting_reevalution = repeat([this] {
|
||||
return _postponed_reevaluation.wait().then([this] {
|
||||
if (_stopped) {
|
||||
_postponed.clear();
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
auto postponed = std::move(_postponed);
|
||||
try {
|
||||
for (auto& cf : postponed) {
|
||||
submit(cf);
|
||||
}
|
||||
} catch (...) {
|
||||
_postponed = std::move(postponed);
|
||||
}
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void compaction_manager::reevalute_postponed_compactions() {
|
||||
_postponed_reevaluation.signal();
|
||||
}
|
||||
|
||||
void compaction_manager::postpone_compaction_for_column_family(column_family* cf) {
|
||||
_postponed.push_back(cf);
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop() {
|
||||
cmlog.info("Asked to stop");
|
||||
if (_stopped) {
|
||||
@@ -410,6 +434,9 @@ future<> compaction_manager::stop() {
|
||||
return parallel_for_each(tasks, [this] (auto& task) {
|
||||
return this->task_stop(task);
|
||||
});
|
||||
}).then([this] () mutable {
|
||||
reevalute_postponed_compactions();
|
||||
return std::move(_waiting_reevalution);
|
||||
}).then([this] {
|
||||
_weight_tracker.clear();
|
||||
_compaction_submission_timer.cancel();
|
||||
@@ -466,22 +493,25 @@ void compaction_manager::submit(column_family* cf) {
|
||||
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(cf, get_candidates(cf));
|
||||
int weight = trim_to_compact(&cf, descriptor);
|
||||
|
||||
// Stop compaction task immediately if strategy is satisfied or job cannot run in parallel.
|
||||
if (descriptor.sstables.empty() || !can_register_weight(&cf, weight, cs.parallel_compaction())) {
|
||||
if (descriptor.sstables.empty() || !can_proceed(task)) {
|
||||
_stats.pending_tasks--;
|
||||
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
if (!can_register_weight(&cf, weight)) {
|
||||
_stats.pending_tasks--;
|
||||
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
|
||||
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
|
||||
postpone_compaction_for_column_family(&cf);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
auto compacting = compacting_sstable_registration(this, descriptor.sstables);
|
||||
auto c_weight = compaction_weight_registration(this, &cf, weight);
|
||||
descriptor.weight_registration = compaction_weight_registration(this, &cf, weight);
|
||||
cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}",
|
||||
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
|
||||
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
return cf.run_compaction(std::move(descriptor))
|
||||
.then_wrapped([this, task, compacting = std::move(compacting), c_weight = std::move(c_weight)] (future<> f) mutable {
|
||||
return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
|
||||
_stats.active_tasks--;
|
||||
|
||||
if (!can_proceed(task)) {
|
||||
@@ -579,11 +609,12 @@ future<> compaction_manager::remove(column_family* cf) {
|
||||
task->stopping = true;
|
||||
}
|
||||
}
|
||||
_postponed.erase(boost::remove(_postponed, cf), _postponed.end());
|
||||
|
||||
// Wait for the termination of an ongoing compaction on cf, if any.
|
||||
return do_for_each(*tasks_to_stop, [this, cf] (auto& task) {
|
||||
return this->task_stop(task);
|
||||
}).then([this, cf, tasks_to_stop] {
|
||||
_weight_tracker.erase(cf);
|
||||
_compaction_locks.erase(cf);
|
||||
});
|
||||
}
|
||||
@@ -605,3 +636,8 @@ void compaction_manager::stop_compaction(sstring type) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_manager::on_compaction_complete(compaction_weight_registration& weight_registration) {
|
||||
weight_registration.deregister();
|
||||
reevalute_postponed_compactions();
|
||||
}
|
||||
|
||||
@@ -73,9 +73,13 @@ private:
|
||||
// a sstable from being compacted twice.
|
||||
std::unordered_set<sstables::shared_sstable> _compacting_sstables;
|
||||
|
||||
// Keep track of weight of ongoing compaction for each column family.
|
||||
// That's used to allow parallel compaction on the same column family.
|
||||
std::unordered_map<column_family*, std::unordered_set<int>> _weight_tracker;
|
||||
future<> _waiting_reevalution = make_ready_future<>();
|
||||
condition_variable _postponed_reevaluation;
|
||||
// column families that wait for compaction but had its submission postponed due to ongoing compaction.
|
||||
std::vector<column_family*> _postponed;
|
||||
// tracks taken weights of ongoing compactions, only one compaction per weight is allowed.
|
||||
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
|
||||
std::unordered_set<int> _weight_tracker;
|
||||
|
||||
// Purpose is to serialize major compaction across all column families, so as to
|
||||
// reduce disk space requirement.
|
||||
@@ -93,14 +97,13 @@ private:
|
||||
private:
|
||||
future<> task_stop(lw_shared_ptr<task> task);
|
||||
|
||||
// Return true if weight is not registered. If parallel_compaction is not
|
||||
// true, only one weight is allowed to be registered.
|
||||
bool can_register_weight(column_family* cf, int weight, bool parallel_compaction);
|
||||
// Return true if weight is not registered.
|
||||
bool can_register_weight(column_family* cf, int weight);
|
||||
// Register weight for a column family. Do that only if can_register_weight()
|
||||
// returned true.
|
||||
void register_weight(column_family* cf, int weight);
|
||||
void register_weight(int weight);
|
||||
// Deregister weight for a column family.
|
||||
void deregister_weight(column_family* cf, int weight);
|
||||
void deregister_weight(int weight);
|
||||
|
||||
// If weight of compaction job is taken, it will be trimmed until its new
|
||||
// weight is not taken or its size is equal to minimum threshold.
|
||||
@@ -125,6 +128,12 @@ private:
|
||||
// stop of transportation services. It cannot make progress anyway.
|
||||
// Returns true if error is judged not fatal, and compaction can be retried.
|
||||
inline bool maybe_stop_on_error(future<> f);
|
||||
|
||||
void postponed_compactions_reevaluation();
|
||||
void reevalute_postponed_compactions();
|
||||
// Postpone compaction for a column family that couldn't be executed due to ongoing
|
||||
// similar-sized compaction.
|
||||
void postpone_compaction_for_column_family(column_family* cf);
|
||||
public:
|
||||
compaction_manager();
|
||||
~compaction_manager();
|
||||
@@ -188,6 +197,11 @@ public:
|
||||
// Stops ongoing compaction of a given type.
|
||||
void stop_compaction(sstring type);
|
||||
|
||||
// Called by compaction procedure to release the weight lock assigned to it, such that
|
||||
// another compaction waiting on same weight can start as soon as possible. That's usually
|
||||
// called before compaction seals sstable and such and after all compaction work is done.
|
||||
void on_compaction_complete(compaction_weight_registration& weight_registration);
|
||||
|
||||
friend class compacting_sstable_registration;
|
||||
friend class compaction_weight_registration;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user