compaction: move rewrite_sstables to the compaction_manager
There is no reason why the table code has to be aware of the efforts of rewriting (cleanup, scrub, upgrade) an SSTable versus compacting it. Rewrite is special, because we need to do it one SSTable at a time, without lumping it together. However, the compaction manager is totally capable of doing that itself. If we do that, the special "table::rewrite_sstables" can be killed. This code would maybe be better off as a thread, where we wouldn't need to keep state. However there are some methods like maybe_stop_on_error() that expect a future so I am leaving this be for now. This is a cleanup that can be done later. Signed-off-by: Glauber Costa <glauber@scylladb.com> Message-Id: <20200401162722.28780-2-glauber@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
ac43a9e2aa
commit
463d0ab37c
@@ -839,11 +839,6 @@ public:
|
||||
future<> compact_all_sstables();
|
||||
// Compact all sstables provided in the vector.
|
||||
future<> compact_sstables(sstables::compaction_descriptor descriptor);
|
||||
// Compact all sstables provided in the descriptor one-by-one.
|
||||
//
|
||||
// Will call `compact_sstables()` for each sstable. Use by compaction
|
||||
// types such as cleanup or upgrade.
|
||||
future<> rewrite_sstables(sstables::compaction_descriptor descriptor);
|
||||
|
||||
future<bool> snapshot_exists(sstring name);
|
||||
|
||||
|
||||
@@ -596,17 +596,28 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
|
||||
task->compacting_cf = cf;
|
||||
task->type = options.type();
|
||||
_tasks.push_back(task);
|
||||
_stats.pending_tasks++;
|
||||
|
||||
task->compaction_done = repeat([this, task, options, get_func = std::move(get_func)] () mutable {
|
||||
auto sstables = std::make_unique<std::vector<sstables::shared_sstable>>(get_func(*cf));
|
||||
auto sstables_ptr = sstables.get();
|
||||
_stats.pending_tasks += sstables->size();
|
||||
|
||||
task->compaction_done = do_until([sstables_ptr] { return sstables_ptr->empty(); }, [this, task, options, sstables_ptr] () mutable {
|
||||
|
||||
// FIXME: lock cf here
|
||||
if (!can_proceed(task)) {
|
||||
_stats.pending_tasks--;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto sst = sstables_ptr->back();
|
||||
sstables_ptr->pop_back();
|
||||
|
||||
return repeat([this, task, options, sst = std::move(sst)] () mutable {
|
||||
column_family& cf = *task->compacting_cf;
|
||||
sstables::compaction_descriptor descriptor = sstables::compaction_descriptor(get_func(cf));
|
||||
descriptor.options = options;
|
||||
auto sstable_level = sst->get_sstable_level();
|
||||
auto run_identifier = sst->run_identifier();
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, sstable_level,
|
||||
sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
|
||||
|
||||
auto compacting = make_lw_shared<compacting_sstable_registration>(this, descriptor.sstables);
|
||||
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
||||
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
||||
@@ -619,7 +630,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
||||
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
|
||||
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable {
|
||||
return cf.rewrite_sstables(std::move(descriptor));
|
||||
return cf.run_compaction(std::move(descriptor));
|
||||
});
|
||||
}).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
|
||||
task->compaction_running = false;
|
||||
@@ -639,7 +650,9 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
|
||||
reevaluate_postponed_compactions();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
}).finally([this, task] {
|
||||
});
|
||||
}).finally([this, task, sstables = std::move(sstables)] {
|
||||
_stats.pending_tasks -= sstables->size();
|
||||
_tasks.remove(task);
|
||||
});
|
||||
|
||||
|
||||
24
table.cc
24
table.cc
@@ -1327,30 +1327,6 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> table::rewrite_sstables(sstables::compaction_descriptor descriptor) {
|
||||
return do_with(std::move(descriptor.sstables), std::move(descriptor.release_exhausted),
|
||||
[this, options = descriptor.options] (auto& sstables, auto& release_fn) {
|
||||
return do_for_each(sstables, [this, &release_fn, options] (auto& sst) {
|
||||
// this semaphore ensures that only one rewrite will run per shard.
|
||||
// That's to prevent node from running out of space when almost all sstables
|
||||
// need rewrite, so if sstables are rewritten in parallel, we may need almost
|
||||
// twice the disk space used by those sstables.
|
||||
static thread_local named_semaphore sem(1, named_semaphore_exception_factory{"rewrite sstables"});
|
||||
|
||||
return with_semaphore(sem, 1, [this, &sst, &release_fn, options] {
|
||||
// release reference to sstables cleaned up, otherwise space usage from their data and index
|
||||
// components cannot be reclaimed until all of them are cleaned.
|
||||
auto sstable_level = sst->get_sstable_level();
|
||||
auto run_identifier = sst->run_identifier();
|
||||
auto descriptor = sstables::compaction_descriptor({ std::move(sst) }, sstable_level,
|
||||
sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
|
||||
descriptor.release_exhausted = release_fn;
|
||||
return this->compact_sstables(std::move(descriptor));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Note: We assume that the column_family does not get destroyed during compaction.
|
||||
future<>
|
||||
table::compact_all_sstables() {
|
||||
|
||||
Reference in New Issue
Block a user