diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 26d310e6f0..26b66bfa53 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -188,7 +188,7 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const { return 0; } auto largest_fan_in = std::ranges::max(_tasks | boost::adaptors::transformed([] (auto& task) { - return task->compaction_running() ? task->compaction_data().compaction_fan_in : 0; + return task.compaction_running() ? task.compaction_data().compaction_fan_in : 0; })); // conservatively limit fan-in threshold to 32, such that tons of small sstables won't accumulate if // running major on a leveled table, which can even have more than one thousand files. @@ -599,9 +599,9 @@ requires (compaction_manager& cm, throw_if_stopping do_throw_if_stopping, Args&& } future compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args) { auto task_executor = seastar::make_shared(*this, do_throw_if_stopping, std::forward(args)...); - _tasks.push_back(task_executor); - auto unregister_task = defer([this, task_executor] { - _tasks.remove(task_executor); + _tasks.push_back(*task_executor); + auto unregister_task = defer([task_executor] { + task_executor->unlink(); task_executor->switch_state(compaction_task_executor::state::none); }); @@ -903,10 +903,10 @@ public: explicit strategy_control(compaction_manager& cm) noexcept : _cm(cm) {} bool has_ongoing_compaction(table_state& table_s) const noexcept override { - return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr& task) { - return task->compaction_running() - && task->compacting_table()->schema()->ks_name() == s->ks_name() - && task->compacting_table()->schema()->cf_name() == s->cf_name(); + return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const compaction_task_executor& task) { + return task.compaction_running() + && task.compacting_table()->schema()->ks_name() == s->ks_name() + && task.compacting_table()->schema()->cf_name() == s->cf_name(); }); } @@ -1092,9 +1092,12 @@ future<> compaction_manager::stop_tasks(std::vector compaction_manager::stop_ongoing_compactions(sstring reason, table_state* t, std::optional type_opt) noexcept { try { auto ongoing_compactions = get_compactions(t).size(); - auto tasks = boost::copy_range>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) { - return (!t || task->compacting_table() == t) && (!type_opt || task->compaction_type() == *type_opt); - })); + auto tasks = _tasks + | std::views::filter([t, type_opt] (const auto& task) { + return (!t || task.compacting_table() == t) && (!type_opt || task.compaction_type() == *type_opt); + }) + | std::views::transform([] (auto& task) { return task.shared_from_this(); }) + | std::ranges::to>>(); logging::log_level level = tasks.empty() ? log_level::debug : log_level::info; if (cmlog.is_enabled(level)) { std::string scope = ""; @@ -2018,7 +2021,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t, tasks::task_info info) { auto check_for_cleanup = [this, &t] { return boost::algorithm::any_of(_tasks, [&t] (auto& task) { - return task->compacting_table() == &t && task->compaction_type() == sstables::compaction_type::Cleanup; + return task.compacting_table() == &t && task.compaction_type() == sstables::compaction_type::Cleanup; }); }; if (check_for_cleanup()) { @@ -2198,11 +2201,11 @@ future<> compaction_manager::remove(table_state& t, sstring reason) noexcept { auto found = false; sstring msg; for (auto& task : _tasks) { - if (task->compacting_table() == &t) { + if (task.compacting_table() == &t) { if (!msg.empty()) { msg += "\n"; } - msg += format("Found {} after remove", *task.get()); + msg += format("Found {} after remove", task); found = true; } } @@ -2213,25 +2216,25 @@ future<> compaction_manager::remove(table_state& t, sstring reason) noexcept { } const std::vector compaction_manager::get_compactions(table_state* t) const { - auto to_info = [] (const shared_ptr& task) { + auto to_info = [] (const compaction_task_executor& task) { sstables::compaction_info ret; - ret.compaction_uuid = task->compaction_data().compaction_uuid; - ret.type = task->compaction_type(); - ret.ks_name = task->compacting_table()->schema()->ks_name(); - ret.cf_name = task->compacting_table()->schema()->cf_name(); - ret.total_partitions = task->compaction_data().total_partitions; - ret.total_keys_written = task->compaction_data().total_keys_written; + ret.compaction_uuid = task.compaction_data().compaction_uuid; + ret.type = task.compaction_type(); + ret.ks_name = task.compacting_table()->schema()->ks_name(); + ret.cf_name = task.compacting_table()->schema()->cf_name(); + ret.total_partitions = task.compaction_data().total_partitions; + ret.total_keys_written = task.compaction_data().total_keys_written; return ret; }; using ret = std::vector; - return boost::copy_range(_tasks | boost::adaptors::filtered([t] (const shared_ptr& task) { - return (!t || task->compacting_table() == t) && task->compaction_running(); + return boost::copy_range(_tasks | boost::adaptors::filtered([t] (const compaction_task_executor& task) { + return (!t || task.compacting_table() == t) && task.compaction_running(); }) | boost::adaptors::transformed(to_info)); } bool compaction_manager::has_table_ongoing_compaction(const table_state& t) const { - return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr& task) { - return task->compacting_table() == &t && task->compaction_running(); + return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const compaction_task_executor& task) { + return task.compacting_table() == &t && task.compaction_running(); }); }; @@ -2269,8 +2272,8 @@ future<> compaction_manager::stop_compaction(sstring type, table_state* table) { void compaction_manager::propagate_replacement(table_state& t, const std::vector& removed, const std::vector& added) { for (auto& task : _tasks) { - if (task->compacting_table() == &t && task->compaction_running()) { - task->compaction_data().pending_replacements.push_back({ removed, added }); + if (task.compacting_table() == &t && task.compaction_running()) { + task.compaction_data().pending_replacements.push_back({ removed, added }); } } } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 65e9f52890..d3d84901ca 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -87,8 +87,13 @@ public: private: shared_ptr _task_manager_module; + + using compaction_task_executor_list_type = bi::list< + compaction_task_executor, + bi::base_hook>>, + bi::constant_time_size>; // compaction manager may have N fibers to allow parallel compaction per shard. - std::list> _tasks; + compaction_task_executor_list_type _tasks; // Possible states in which the compaction manager can be found. // @@ -464,7 +469,9 @@ public: namespace compaction { -class compaction_task_executor : public enable_shared_from_this { +class compaction_task_executor + : public enable_shared_from_this + , public boost::intrusive::list_base_hook> { public: enum class state { none, // initial and final state diff --git a/scylla-gdb.py b/scylla-gdb.py index 6a8b721eed..92082943a9 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -5300,7 +5300,11 @@ class scylla_compaction_tasks(gdb.Command): pass task_hist = histogram(print_indicators=False) - task_list = list(std_list(cm['_tasks'])) + try: + task_list = list(intrusive_list(cm['_tasks'])) + except gdb.error: # 6.2 compatibility + task_list = list(std_list(cm['_tasks'])) + for task in task_list: task = seastar_shared_ptr(task).get().dereference() schema = schema_ptr(task['_compacting_table'].dereference()['_schema']) diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 4b651e79e3..661dd6a3e3 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -532,11 +532,12 @@ void test_env_compaction_manager::propagate_replacement(compaction::table_state& // Test version of compaction_manager::perform_compaction<>() future<> test_env_compaction_manager::perform_compaction(shared_ptr task) { - _cm._tasks.push_back(task); - auto unregister_task = defer([this, task] { - if (_cm._tasks.remove(task) == 0) { + _cm._tasks.push_back(*task); + auto unregister_task = defer([task] { + if (!task->is_linked()) { testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", task->compaction_data().compaction_uuid); } + task->unlink(); task->switch_state(compaction_task_executor::state::none); }); co_await task->run_compaction();