diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index e50e116b68..f6d9b807ab 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -629,10 +629,11 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa _tasks.push_back(task); auto sstables = std::make_unique>(get_func(*cf)); + auto compacting = make_lw_shared(this, *sstables); auto sstables_ptr = sstables.get(); _stats.pending_tasks += sstables->size(); - task->compaction_done = do_until([sstables_ptr] { return sstables_ptr->empty(); }, [this, task, options, sstables_ptr] () mutable { + task->compaction_done = do_until([sstables_ptr] { return sstables_ptr->empty(); }, [this, task, options, sstables_ptr, compacting] () mutable { // FIXME: lock cf here if (!can_proceed(task)) { @@ -642,7 +643,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa auto sst = sstables_ptr->back(); sstables_ptr->pop_back(); - return repeat([this, task, options, sst = std::move(sst)] () mutable { + return repeat([this, task, options, sst = std::move(sst), compacting] () mutable { column_family& cf = *task->compacting_cf; auto sstable_level = sst->get_sstable_level(); auto run_identifier = sst->run_identifier(); @@ -650,7 +651,6 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa auto descriptor = sstables::compaction_descriptor({ sst }, cf.get_sstable_set(), service::get_local_compaction_priority(), sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options); - auto compacting = make_lw_shared(this, descriptor.sstables); // Releases reference to cleaned sstable such that respective used disk space can be freed. descriptor.release_exhausted = [compacting] (const std::vector& exhausted_sstables) { compacting->release_compacting(exhausted_sstables); @@ -664,7 +664,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable { return cf.run_compaction(std::move(descriptor)); }); - }).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable { + }).then_wrapped([this, task, compacting] (future<> f) mutable { task->compaction_running = false; _stats.active_tasks--; if (!can_proceed(task)) {