compaction: move rewrite_sstables to the compaction_manager

There is no reason why the table code has to be aware of the efforts of
rewriting (cleanup, scrub, upgrade) an SSTable versus compacting it.

Rewrite is special, because we need to do it one SSTable at a time,
without lumping it together. However, the compaction manager is totally
capable of doing that itself. If we do that, the special
"table::rewrite_sstables" can be killed.

This code would maybe be better off as a thread, where we wouldn't need
to keep state. However there are some methods like maybe_stop_on_error()
that expect a future so I am leaving this be for now. This is a cleanup
that can be done later.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <20200401162722.28780-2-glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2020-04-01 12:27:21 -04:00
committed by Avi Kivity
parent ac43a9e2aa
commit 463d0ab37c
3 changed files with 21 additions and 37 deletions

View File

@@ -839,11 +839,6 @@ public:
future<> compact_all_sstables();
// Compact all sstables provided in the vector.
future<> compact_sstables(sstables::compaction_descriptor descriptor);
// Compact all sstables provided in the descriptor one-by-one.
//
// Will call `compact_sstables()` for each sstable. Use by compaction
// types such as cleanup or upgrade.
future<> rewrite_sstables(sstables::compaction_descriptor descriptor);
future<bool> snapshot_exists(sstring name);

View File

@@ -596,17 +596,28 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
task->compacting_cf = cf;
task->type = options.type();
_tasks.push_back(task);
_stats.pending_tasks++;
task->compaction_done = repeat([this, task, options, get_func = std::move(get_func)] () mutable {
auto sstables = std::make_unique<std::vector<sstables::shared_sstable>>(get_func(*cf));
auto sstables_ptr = sstables.get();
_stats.pending_tasks += sstables->size();
task->compaction_done = do_until([sstables_ptr] { return sstables_ptr->empty(); }, [this, task, options, sstables_ptr] () mutable {
// FIXME: lock cf here
if (!can_proceed(task)) {
_stats.pending_tasks--;
return make_ready_future<stop_iteration>(stop_iteration::yes);
return make_ready_future<>();
}
auto sst = sstables_ptr->back();
sstables_ptr->pop_back();
return repeat([this, task, options, sst = std::move(sst)] () mutable {
column_family& cf = *task->compacting_cf;
sstables::compaction_descriptor descriptor = sstables::compaction_descriptor(get_func(cf));
descriptor.options = options;
auto sstable_level = sst->get_sstable_level();
auto run_identifier = sst->run_identifier();
auto descriptor = sstables::compaction_descriptor({ sst }, sstable_level,
sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
auto compacting = make_lw_shared<compacting_sstable_registration>(this, descriptor.sstables);
// Releases reference to cleaned sstable such that respective used disk space can be freed.
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
@@ -619,7 +630,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable {
return cf.rewrite_sstables(std::move(descriptor));
return cf.run_compaction(std::move(descriptor));
});
}).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
task->compaction_running = false;
@@ -639,7 +650,9 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
reevaluate_postponed_compactions();
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
}).finally([this, task] {
});
}).finally([this, task, sstables = std::move(sstables)] {
_stats.pending_tasks -= sstables->size();
_tasks.remove(task);
});

View File

@@ -1327,30 +1327,6 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) {
});
}
future<> table::rewrite_sstables(sstables::compaction_descriptor descriptor) {
return do_with(std::move(descriptor.sstables), std::move(descriptor.release_exhausted),
[this, options = descriptor.options] (auto& sstables, auto& release_fn) {
return do_for_each(sstables, [this, &release_fn, options] (auto& sst) {
// this semaphore ensures that only one rewrite will run per shard.
// That's to prevent node from running out of space when almost all sstables
// need rewrite, so if sstables are rewritten in parallel, we may need almost
// twice the disk space used by those sstables.
static thread_local named_semaphore sem(1, named_semaphore_exception_factory{"rewrite sstables"});
return with_semaphore(sem, 1, [this, &sst, &release_fn, options] {
// release reference to sstables cleaned up, otherwise space usage from their data and index
// components cannot be reclaimed until all of them are cleaned.
auto sstable_level = sst->get_sstable_level();
auto run_identifier = sst->run_identifier();
auto descriptor = sstables::compaction_descriptor({ std::move(sst) }, sstable_level,
sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
descriptor.release_exhausted = release_fn;
return this->compact_sstables(std::move(descriptor));
});
});
});
}
// Note: We assume that the column_family does not get destroyed during compaction.
future<>
table::compact_all_sstables() {