compaction_manager: serialize compaction of same size tier for different cfs

Currently, compaction manager will serialize compaction of same size tier
(or weight) if they belong to the same column family. However, it fails to
do so if the compaction jobs belong to different column families.
That can lead to an ungodly amount of running compaction which gets worse
the higher the number of shards and active column families. The problem
is that it may affect overall system performance due to excessive resource
usage. It's easy to trigger it during bootstraping after loading node with
new sstables or repairing, or if lots of cfs are being actively written.

That being said, compaction jobs of same size tier are now serialized
on a given shard, such that maximum number of compaction (system wise)
is now:
(SHARDS) * (SIZE TIERS)
instead of:
(SHARDS) * (COLUMN FAMILIES) * (SIZE TIERS)

We'll work hard to release a size tier (weight) for a column family
waiting on it as fast as possible, given that we wouldn't like to
underutilize resources available for compaction. We want one starting
after the other. Compaction for a column family that cannot run now
because the size tier is taken, will be postponed. There's a worker
that will be sleeping on a condition variable that will be signalled
whenever a compaction completes. FIFO ordering is used on postponed
list for fairness.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2017-12-17 17:39:04 -02:00
parent fa0e53f626
commit eff62bc61e
5 changed files with 104 additions and 42 deletions

View File

@@ -1324,7 +1324,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
return make_ready_future<>();
}
return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor), cleanup] {
return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor), cleanup] () mutable {
auto create_sstable = [this] {
auto gen = this->calculate_generation_for_new_table();
auto sst = sstables::make_sstable(_schema, _config.datadir, gen,

View File

@@ -356,6 +356,7 @@ class regular_compaction : public compaction {
// sstable being currently written.
shared_sstable _sst;
stdx::optional<sstable_writer> _writer;
stdx::optional<compaction_weight_registration> _weight_registration;
public:
regular_compaction(column_family& cf, compaction_descriptor descriptor, std::function<shared_sstable()> creator,
seastar::thread_scheduling_group* tsg)
@@ -363,6 +364,7 @@ public:
, _creator(std::move(creator))
, _set(cf.get_sstable_set())
, _selector(_set.make_incremental_selector())
, _weight_registration(std::move(descriptor.weight_registration))
{
}
@@ -405,10 +407,17 @@ public:
}
virtual void finish_sstable_writer() override {
on_end_of_stream();
if (_writer) {
stop_sstable_writer();
}
}
private:
void on_end_of_stream() {
if (_weight_registration) {
_cf.get_compaction_manager().on_compaction_complete(*_weight_registration);
}
}
};
class cleanup_compaction final : public regular_compaction {

View File

@@ -25,6 +25,7 @@
#include "database_fwd.hh"
#include "shared_sstable.hh"
#include "gc_clock.hh"
#include "compaction_weight_registration.hh"
#include <seastar/core/thread.hh>
#include <functional>
@@ -37,6 +38,8 @@ namespace sstables {
int level;
// Threshold size for sstable(s) to be created.
uint64_t max_sstable_bytes;
// Holds ownership of a weight assigned to this compaction iff it's a regular one.
stdx::optional<compaction_weight_registration> weight_registration;
compaction_descriptor() = default;

View File

@@ -25,6 +25,7 @@
#include <seastar/core/metrics.hh>
#include "exceptions.hh"
#include <cmath>
#include <boost/algorithm/cxx11/any_of.hpp>
static logging::logger cmlog("compaction_manager");
@@ -69,7 +70,7 @@ compaction_weight_registration::compaction_weight_registration(compaction_manage
, _cf(cf)
, _weight(weight)
{
_cm->register_weight(_cf, _weight);
_cm->register_weight(_weight);
}
compaction_weight_registration& compaction_weight_registration::operator=(compaction_weight_registration&& other) noexcept {
@@ -92,12 +93,12 @@ compaction_weight_registration::compaction_weight_registration(compaction_weight
compaction_weight_registration::~compaction_weight_registration() {
if (_cm) {
_cm->deregister_weight(_cf, _weight);
_cm->deregister_weight(_weight);
}
}
void compaction_weight_registration::deregister() {
_cm->deregister_weight(_cf, _weight);
_cm->deregister_weight(_weight);
_cm = nullptr;
}
@@ -140,17 +141,12 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
if (descriptor.level != 0 || descriptor.sstables.empty()) {
return weight;
}
auto it = _weight_tracker.find(cf);
if (it == _weight_tracker.end()) {
return weight;
}
std::unordered_set<int>& s = it->second;
uint64_t total_size = get_total_size(descriptor.sstables);
int min_threshold = cf->schema()->min_compaction_threshold();
while (descriptor.sstables.size() > size_t(min_threshold)) {
if (s.count(weight)) {
if (_weight_tracker.count(weight)) {
total_size -= descriptor.sstables.back()->data_size();
descriptor.sstables.pop_back();
weight = calculate_weight(total_size);
@@ -161,20 +157,25 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
return weight;
}
bool compaction_manager::can_register_weight(column_family* cf, int weight, bool parallel_compaction) {
auto it = _weight_tracker.find(cf);
if (it == _weight_tracker.end()) {
bool compaction_manager::can_register_weight(column_family* cf, int weight) {
if (_weight_tracker.empty()) {
return true;
}
std::unordered_set<int>& s = it->second;
auto has_cf_ongoing_compaction = [&] {
return boost::algorithm::any_of(_tasks, [&] (const lw_shared_ptr<task>& task) {
return task->compacting_cf == cf;
});
};
// Only one weight is allowed if parallel compaction is disabled.
if (!parallel_compaction && !s.empty()) {
if (!cf->get_compaction_strategy().parallel_compaction() && has_cf_ongoing_compaction()) {
return false;
}
// TODO: Maybe allow only *smaller* compactions to start? That can be done
// by returning true only if weight is not in the set and is lower than any
// entry in the set.
if (s.count(weight)) {
if (_weight_tracker.count(weight)) {
// If reached this point, it means that there is an ongoing compaction
// with the weight of the compaction job.
return false;
@@ -182,19 +183,13 @@ bool compaction_manager::can_register_weight(column_family* cf, int weight, bool
return true;
}
void compaction_manager::register_weight(column_family* cf, int weight) {
auto it = _weight_tracker.find(cf);
if (it == _weight_tracker.end()) {
_weight_tracker.insert({cf, {weight}});
} else {
it->second.insert(weight);
}
void compaction_manager::register_weight(int weight) {
_weight_tracker.insert(weight);
}
void compaction_manager::deregister_weight(column_family* cf, int weight) {
auto it = _weight_tracker.find(cf);
assert(it != _weight_tracker.end());
it->second.erase(weight);
void compaction_manager::deregister_weight(int weight) {
_weight_tracker.erase(weight);
reevalute_postponed_compactions();
}
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const column_family& cf) {
@@ -381,6 +376,7 @@ void compaction_manager::start() {
_stopped = false;
register_metrics();
_compaction_submission_timer.arm(periodic_compaction_submission_interval());
postponed_compactions_reevaluation();
}
std::function<void()> compaction_manager::compaction_submission_callback() {
@@ -391,6 +387,34 @@ std::function<void()> compaction_manager::compaction_submission_callback() {
};
}
void compaction_manager::postponed_compactions_reevaluation() {
_waiting_reevalution = repeat([this] {
return _postponed_reevaluation.wait().then([this] {
if (_stopped) {
_postponed.clear();
return stop_iteration::yes;
}
auto postponed = std::move(_postponed);
try {
for (auto& cf : postponed) {
submit(cf);
}
} catch (...) {
_postponed = std::move(postponed);
}
return stop_iteration::no;
});
});
}
void compaction_manager::reevalute_postponed_compactions() {
_postponed_reevaluation.signal();
}
void compaction_manager::postpone_compaction_for_column_family(column_family* cf) {
_postponed.push_back(cf);
}
future<> compaction_manager::stop() {
cmlog.info("Asked to stop");
if (_stopped) {
@@ -410,6 +434,9 @@ future<> compaction_manager::stop() {
return parallel_for_each(tasks, [this] (auto& task) {
return this->task_stop(task);
});
}).then([this] () mutable {
reevalute_postponed_compactions();
return std::move(_waiting_reevalution);
}).then([this] {
_weight_tracker.clear();
_compaction_submission_timer.cancel();
@@ -466,22 +493,25 @@ void compaction_manager::submit(column_family* cf) {
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(cf, get_candidates(cf));
int weight = trim_to_compact(&cf, descriptor);
// Stop compaction task immediately if strategy is satisfied or job cannot run in parallel.
if (descriptor.sstables.empty() || !can_register_weight(&cf, weight, cs.parallel_compaction())) {
if (descriptor.sstables.empty() || !can_proceed(task)) {
_stats.pending_tasks--;
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
if (!can_register_weight(&cf, weight)) {
_stats.pending_tasks--;
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
postpone_compaction_for_column_family(&cf);
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto compacting = compacting_sstable_registration(this, descriptor.sstables);
auto c_weight = compaction_weight_registration(this, &cf, weight);
descriptor.weight_registration = compaction_weight_registration(this, &cf, weight);
cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}",
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
_stats.pending_tasks--;
_stats.active_tasks++;
return cf.run_compaction(std::move(descriptor))
.then_wrapped([this, task, compacting = std::move(compacting), c_weight = std::move(c_weight)] (future<> f) mutable {
return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
_stats.active_tasks--;
if (!can_proceed(task)) {
@@ -579,11 +609,12 @@ future<> compaction_manager::remove(column_family* cf) {
task->stopping = true;
}
}
_postponed.erase(boost::remove(_postponed, cf), _postponed.end());
// Wait for the termination of an ongoing compaction on cf, if any.
return do_for_each(*tasks_to_stop, [this, cf] (auto& task) {
return this->task_stop(task);
}).then([this, cf, tasks_to_stop] {
_weight_tracker.erase(cf);
_compaction_locks.erase(cf);
});
}
@@ -605,3 +636,8 @@ void compaction_manager::stop_compaction(sstring type) {
}
}
}
void compaction_manager::on_compaction_complete(compaction_weight_registration& weight_registration) {
weight_registration.deregister();
reevalute_postponed_compactions();
}

View File

@@ -73,9 +73,13 @@ private:
// a sstable from being compacted twice.
std::unordered_set<sstables::shared_sstable> _compacting_sstables;
// Keep track of weight of ongoing compaction for each column family.
// That's used to allow parallel compaction on the same column family.
std::unordered_map<column_family*, std::unordered_set<int>> _weight_tracker;
future<> _waiting_reevalution = make_ready_future<>();
condition_variable _postponed_reevaluation;
// column families that wait for compaction but had its submission postponed due to ongoing compaction.
std::vector<column_family*> _postponed;
// tracks taken weights of ongoing compactions, only one compaction per weight is allowed.
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
std::unordered_set<int> _weight_tracker;
// Purpose is to serialize major compaction across all column families, so as to
// reduce disk space requirement.
@@ -93,14 +97,13 @@ private:
private:
future<> task_stop(lw_shared_ptr<task> task);
// Return true if weight is not registered. If parallel_compaction is not
// true, only one weight is allowed to be registered.
bool can_register_weight(column_family* cf, int weight, bool parallel_compaction);
// Return true if weight is not registered.
bool can_register_weight(column_family* cf, int weight);
// Register weight for a column family. Do that only if can_register_weight()
// returned true.
void register_weight(column_family* cf, int weight);
void register_weight(int weight);
// Deregister weight for a column family.
void deregister_weight(column_family* cf, int weight);
void deregister_weight(int weight);
// If weight of compaction job is taken, it will be trimmed until its new
// weight is not taken or its size is equal to minimum threshold.
@@ -125,6 +128,12 @@ private:
// stop of transportation services. It cannot make progress anyway.
// Returns true if error is judged not fatal, and compaction can be retried.
inline bool maybe_stop_on_error(future<> f);
void postponed_compactions_reevaluation();
void reevalute_postponed_compactions();
// Postpone compaction for a column family that couldn't be executed due to ongoing
// similar-sized compaction.
void postpone_compaction_for_column_family(column_family* cf);
public:
compaction_manager();
~compaction_manager();
@@ -188,6 +197,11 @@ public:
// Stops ongoing compaction of a given type.
void stop_compaction(sstring type);
// Called by compaction procedure to release the weight lock assigned to it, such that
// another compaction waiting on same weight can start as soon as possible. That's usually
// called before compaction seals sstable and such and after all compaction work is done.
void on_compaction_complete(compaction_weight_registration& weight_registration);
friend class compacting_sstable_registration;
friend class compaction_weight_registration;
};