diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 83031e698d..dffaad1bb5 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -124,6 +124,19 @@ std::vector compaction_manager::get_candidates(const c return candidates; } +void compaction_manager::register_compacting_sstables(const std::vector& sstables) { + for (auto& sst : sstables) { + _compacting_sstables.insert(sst); + } +} + +void compaction_manager::deregister_compacting_sstables(const std::vector& sstables) { + // Remove compacted sstables from the set of compacting sstables. + for (auto& sst : sstables) { + _compacting_sstables.erase(sst); + } +} + lw_shared_ptr compaction_manager::task_start(column_family* cf, bool cleanup) { // NOTE: Compaction code runs in parallel to the rest of the system. // When it's time to shutdown, we need to prevent any new compaction @@ -148,18 +161,12 @@ lw_shared_ptr compaction_manager::task_start(column_fa // Created to erase sstables from _compacting_sstables after compaction finishes. std::vector sstables_to_compact; int weight = -1; - auto keep_track_of_compacting_sstables = [this, &sstables_to_compact, &descriptor] { - sstables_to_compact.reserve(descriptor.sstables.size()); - for (auto& sst : descriptor.sstables) { - sstables_to_compact.push_back(sst); - _compacting_sstables.insert(sst); - } - }; future<> operation = make_ready_future<>(); if (task->cleanup) { descriptor = sstables::compaction_descriptor(std::move(candidates)); - keep_track_of_compacting_sstables(); + sstables_to_compact = descriptor.sstables; + register_compacting_sstables(sstables_to_compact); operation = cf.cleanup_sstables(std::move(descriptor)); } else { sstables::compaction_strategy cs = cf.get_compaction_strategy(); @@ -173,7 +180,8 @@ lw_shared_ptr compaction_manager::task_start(column_fa descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name()); return make_ready_future(stop_iteration::yes); } - keep_track_of_compacting_sstables(); + sstables_to_compact = descriptor.sstables; + register_compacting_sstables(sstables_to_compact); cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}", descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name()); operation = cf.run_compaction(std::move(descriptor)); @@ -186,10 +194,7 @@ lw_shared_ptr compaction_manager::task_start(column_fa task->compaction_retry.reset(); return make_ready_future<>(); }).then_wrapped([this, task, weight, sstables_to_compact = std::move(sstables_to_compact)] (future<> f) { - // Remove compacted sstables from the set of compacting sstables. - for (auto& sst : sstables_to_compact) { - _compacting_sstables.erase(sst); - } + deregister_compacting_sstables(sstables_to_compact); if (weight != -1) { deregister_weight(task->compacting_cf, weight); } diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index bcec997c36..3be09fc70a 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -93,6 +93,9 @@ private: // Get candidates for compaction strategy, which are all sstables but the ones being compacted. std::vector get_candidates(const column_family& cf); + + void register_compacting_sstables(const std::vector& sstables); + void deregister_compacting_sstables(const std::vector& sstables); public: compaction_manager(); ~compaction_manager();