compaction_manager: extract "stop tasks" from stop_ongoing_compactions() into new function

Procedure will be reused to stop a list of tasks

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2021-11-04 12:36:49 -03:00
parent 158f47dfc7
commit 0643faafd7
2 changed files with 12 additions and 7 deletions

View File

@@ -485,13 +485,8 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
_postponed.insert(cf);
}
future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
cmlog.info("Stopping {} ongoing compactions due to {}", get_compactions().size(), reason);
// Wait for each task handler to stop. Copy list because task remove itself
// from the list when done.
auto tasks = _tasks;
return do_with(std::move(tasks), [this, reason] (std::list<lw_shared_ptr<task>>& tasks) {
future<> compaction_manager::stop_tasks(std::vector<lw_shared_ptr<task>> tasks, sstring reason) {
return do_with(std::move(tasks), [this, reason] (std::vector<lw_shared_ptr<task>>& tasks) {
return parallel_for_each(tasks, [this, reason] (auto& task) {
return this->task_stop(task, reason).then_wrapped([](future <> f) {
try {
@@ -507,6 +502,15 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
});
}
future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
cmlog.info("Stopping {} ongoing compactions due to {}", get_compactions().size(), reason);
// Wait for each task handler to stop. Copy list because task remove itself
// from the list when done.
auto tasks = boost::copy_range<std::vector<lw_shared_ptr<task>>>(_tasks);
return stop_tasks(std::move(tasks), std::move(reason));
}
future<> compaction_manager::drain() {
_state = state::disabled;
return stop_ongoing_compactions("drain");

View File

@@ -134,6 +134,7 @@ private:
static constexpr std::chrono::seconds periodic_compaction_submission_interval() { return std::chrono::seconds(3600); }
private:
future<> task_stop(lw_shared_ptr<task> task, sstring reason);
future<> stop_tasks(std::vector<lw_shared_ptr<task>> tasks, sstring reason);
// Return the largest fan-in of currently running compactions
unsigned current_compaction_fan_in_threshold() const;