compaction_manager: use single semaphore for serialization of maintenance compactions
We have three semaphores for serialization of maintenance ops. 1) _rewrite_sstables_sem: for scrub, cleanup and upgrade. 2) _major_compaction_sem: for major 3) _custom_job_sem: for reshape, resharding and offstrategy scrub, cleanup and upgrade should be serialized with major, so rewrite sem should be merged into major one. offstrategy is also a maintenance op that should be serialized with others, to reduce compaction aggressiveness and space requirement. resharding is one-off operation, so can be merged there too. the same applies for reshape, which can take long and not serializing it with other maintenance activity can lead to exhaustion of resources and high space requirement. let's have a single semaphore to guarantee their serialization. deadlock isn't an issue because locks are always taken in same order. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Message-Id: <20211201182046.100942-1-raphaelsc@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
426fc8db3a
commit
6737c88045
@@ -275,7 +275,7 @@ future<> compaction_manager::perform_major_compaction(table* t) {
|
||||
// first take major compaction semaphore, then exclusely take compaction lock for table.
|
||||
// it cannot be the other way around, or minor compaction for this table would be
|
||||
// prevented while an ongoing major compaction doesn't release the semaphore.
|
||||
task->compaction_done = with_semaphore(_major_compaction_sem, 1, [this, task, t] {
|
||||
task->compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, t] {
|
||||
return with_lock(task->compaction_state.lock.for_write(), [this, task, t] {
|
||||
_stats.active_tasks++;
|
||||
if (!can_proceed(task)) {
|
||||
@@ -330,7 +330,7 @@ future<> compaction_manager::run_custom_job(table* t, sstables::compaction_type
|
||||
|
||||
auto job_ptr = std::make_unique<noncopyable_function<future<>(sstables::compaction_data&)>>(std::move(job));
|
||||
|
||||
task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, &job = *job_ptr] () mutable {
|
||||
task->compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, &job = *job_ptr] () mutable {
|
||||
// take read lock for table, so major compaction and resharding can't proceed in parallel.
|
||||
return with_lock(task->compaction_state.lock.for_read(), [this, task, &job] () mutable {
|
||||
_stats.active_tasks++;
|
||||
@@ -738,7 +738,7 @@ void compaction_manager::submit_offstrategy(table* t) {
|
||||
_stats.pending_tasks--;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return with_semaphore(_custom_job_sem, 1, [this, task, t] () mutable {
|
||||
return with_semaphore(_maintenance_ops_sem, 1, [this, task, t] () mutable {
|
||||
return with_lock(task->compaction_state.lock.for_read(), [this, task, t] () mutable {
|
||||
_stats.pending_tasks--;
|
||||
if (!can_proceed(task)) {
|
||||
@@ -819,7 +819,7 @@ future<> compaction_manager::rewrite_sstables(table* t, sstables::compaction_typ
|
||||
compacting->release_compacting(exhausted_sstables);
|
||||
};
|
||||
|
||||
return with_semaphore(_rewrite_sstables_sem, 1, [this, task, &t, descriptor = std::move(descriptor)] () mutable {
|
||||
return with_semaphore(_maintenance_ops_sem, 1, [this, task, &t, descriptor = std::move(descriptor)] () mutable {
|
||||
// Take write lock for table to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard.
|
||||
return with_lock(_compaction_state[&t].lock.for_write(), [this, task, &t, descriptor = std::move(descriptor)] () mutable {
|
||||
_stats.pending_tasks--;
|
||||
|
||||
@@ -150,14 +150,11 @@ private:
|
||||
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
|
||||
std::unordered_set<int> _weight_tracker;
|
||||
|
||||
// Purpose is to serialize major compaction across all tables, so to
|
||||
// reduce disk space requirement.
|
||||
semaphore _major_compaction_sem{1};
|
||||
|
||||
std::unordered_map<table*, compaction_state> _compaction_state;
|
||||
|
||||
semaphore _custom_job_sem{1};
|
||||
seastar::named_semaphore _rewrite_sstables_sem = {1, named_semaphore_exception_factory{"rewrite sstables"}};
|
||||
// Purpose is to serialize all maintenance (non regular) compaction activity to reduce aggressiveness and space requirement.
|
||||
// If the operation must be serialized with regular, then the per-table write lock must be taken.
|
||||
seastar::named_semaphore _maintenance_ops_sem = {1, named_semaphore_exception_factory{"maintenance operation"}};
|
||||
|
||||
std::function<void()> compaction_submission_callback();
|
||||
// all registered tables are reevaluated at a constant interval.
|
||||
|
||||
Reference in New Issue
Block a user