compaction_manager: use single semaphore for serialization of maintenance compactions

We have three semaphores for serialization of maintenance ops.
1) _rewrite_sstables_sem: for scrub, cleanup and upgrade.
2) _major_compaction_sem: for major
3) _custom_job_sem: for reshape, resharding and offstrategy

scrub, cleanup and upgrade should be serialized with major,
so rewrite sem should be merged into major one.

offstrategy is also a maintenance op that should be serialized
with others, to reduce compaction aggressiveness and space
requirement.

resharding is one-off operation, so can be merged there too.
the same applies for reshape, which can take long and not
serializing it with other maintenance activity can lead to
exhaustion of resources and high space requirement.

let's have a single semaphore to guarantee their serialization.

deadlock isn't an issue because locks are always taken in same
order.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20211201182046.100942-1-raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2021-12-01 15:20:46 -03:00
committed by Avi Kivity
parent 426fc8db3a
commit 6737c88045
2 changed files with 7 additions and 10 deletions

View File

@@ -275,7 +275,7 @@ future<> compaction_manager::perform_major_compaction(table* t) {
// first take major compaction semaphore, then exclusely take compaction lock for table.
// it cannot be the other way around, or minor compaction for this table would be
// prevented while an ongoing major compaction doesn't release the semaphore.
task->compaction_done = with_semaphore(_major_compaction_sem, 1, [this, task, t] {
task->compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, t] {
return with_lock(task->compaction_state.lock.for_write(), [this, task, t] {
_stats.active_tasks++;
if (!can_proceed(task)) {
@@ -330,7 +330,7 @@ future<> compaction_manager::run_custom_job(table* t, sstables::compaction_type
auto job_ptr = std::make_unique<noncopyable_function<future<>(sstables::compaction_data&)>>(std::move(job));
task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, &job = *job_ptr] () mutable {
task->compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, &job = *job_ptr] () mutable {
// take read lock for table, so major compaction and resharding can't proceed in parallel.
return with_lock(task->compaction_state.lock.for_read(), [this, task, &job] () mutable {
_stats.active_tasks++;
@@ -738,7 +738,7 @@ void compaction_manager::submit_offstrategy(table* t) {
_stats.pending_tasks--;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return with_semaphore(_custom_job_sem, 1, [this, task, t] () mutable {
return with_semaphore(_maintenance_ops_sem, 1, [this, task, t] () mutable {
return with_lock(task->compaction_state.lock.for_read(), [this, task, t] () mutable {
_stats.pending_tasks--;
if (!can_proceed(task)) {
@@ -819,7 +819,7 @@ future<> compaction_manager::rewrite_sstables(table* t, sstables::compaction_typ
compacting->release_compacting(exhausted_sstables);
};
return with_semaphore(_rewrite_sstables_sem, 1, [this, task, &t, descriptor = std::move(descriptor)] () mutable {
return with_semaphore(_maintenance_ops_sem, 1, [this, task, &t, descriptor = std::move(descriptor)] () mutable {
// Take write lock for table to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard.
return with_lock(_compaction_state[&t].lock.for_write(), [this, task, &t, descriptor = std::move(descriptor)] () mutable {
_stats.pending_tasks--;

View File

@@ -150,14 +150,11 @@ private:
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
std::unordered_set<int> _weight_tracker;
// Purpose is to serialize major compaction across all tables, so to
// reduce disk space requirement.
semaphore _major_compaction_sem{1};
std::unordered_map<table*, compaction_state> _compaction_state;
semaphore _custom_job_sem{1};
seastar::named_semaphore _rewrite_sstables_sem = {1, named_semaphore_exception_factory{"rewrite sstables"}};
// Purpose is to serialize all maintenance (non regular) compaction activity to reduce aggressiveness and space requirement.
// If the operation must be serialized with regular, then the per-table write lock must be taken.
seastar::named_semaphore _maintenance_ops_sem = {1, named_semaphore_exception_factory{"maintenance operation"}};
std::function<void()> compaction_submission_callback();
// all registered tables are reevaluated at a constant interval.