compaction/compaction_manager: make _tasks an intrusive list

_tasks is currently std::list<shared_ptr<compaction_task_executor>>, but
it has no role in keeping the instances alive, this is done by the
fibers which create the task (and pin a shared ptr instance).
This lends itself to an intrusive list, avoiding that extra
allocation upon push_back().
Using an intrusive list also makes it simpler and much cheaper (O(1) vs.
O(N)) to remove tasks from the _tasks list. This will be made use of in
the next patch.

Code using _task has to be updated because the value_type changes from
shared_ptr<compaction_task_executor> to compaction_task_executor&.
This commit is contained in:
Botond Dénes
2024-10-30 09:07:09 -04:00
committed by Benny Halevy
parent 39b55bd3a0
commit e942c074f2
4 changed files with 48 additions and 33 deletions

View File

@@ -188,7 +188,7 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const {
return 0;
}
auto largest_fan_in = std::ranges::max(_tasks | boost::adaptors::transformed([] (auto& task) {
return task->compaction_running() ? task->compaction_data().compaction_fan_in : 0;
return task.compaction_running() ? task.compaction_data().compaction_fan_in : 0;
}));
// conservatively limit fan-in threshold to 32, such that tons of small sstables won't accumulate if
// running major on a leveled table, which can even have more than one thousand files.
@@ -599,9 +599,9 @@ requires (compaction_manager& cm, throw_if_stopping do_throw_if_stopping, Args&&
}
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args) {
auto task_executor = seastar::make_shared<TaskExecutor>(*this, do_throw_if_stopping, std::forward<Args>(args)...);
_tasks.push_back(task_executor);
auto unregister_task = defer([this, task_executor] {
_tasks.remove(task_executor);
_tasks.push_back(*task_executor);
auto unregister_task = defer([task_executor] {
task_executor->unlink();
task_executor->switch_state(compaction_task_executor::state::none);
});
@@ -903,10 +903,10 @@ public:
explicit strategy_control(compaction_manager& cm) noexcept : _cm(cm) {}
bool has_ongoing_compaction(table_state& table_s) const noexcept override {
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr<compaction_task_executor>& task) {
return task->compaction_running()
&& task->compacting_table()->schema()->ks_name() == s->ks_name()
&& task->compacting_table()->schema()->cf_name() == s->cf_name();
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const compaction_task_executor& task) {
return task.compaction_running()
&& task.compacting_table()->schema()->ks_name() == s->ks_name()
&& task.compacting_table()->schema()->cf_name() == s->cf_name();
});
}
@@ -1092,9 +1092,12 @@ future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_e
future<> compaction_manager::stop_ongoing_compactions(sstring reason, table_state* t, std::optional<sstables::compaction_type> type_opt) noexcept {
try {
auto ongoing_compactions = get_compactions(t).size();
auto tasks = boost::copy_range<std::vector<shared_ptr<compaction_task_executor>>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) {
return (!t || task->compacting_table() == t) && (!type_opt || task->compaction_type() == *type_opt);
}));
auto tasks = _tasks
| std::views::filter([t, type_opt] (const auto& task) {
return (!t || task.compacting_table() == t) && (!type_opt || task.compaction_type() == *type_opt);
})
| std::views::transform([] (auto& task) { return task.shared_from_this(); })
| std::ranges::to<std::vector<shared_ptr<compaction_task_executor>>>();
logging::log_level level = tasks.empty() ? log_level::debug : log_level::info;
if (cmlog.is_enabled(level)) {
std::string scope = "";
@@ -2018,7 +2021,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range
future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t, tasks::task_info info) {
auto check_for_cleanup = [this, &t] {
return boost::algorithm::any_of(_tasks, [&t] (auto& task) {
return task->compacting_table() == &t && task->compaction_type() == sstables::compaction_type::Cleanup;
return task.compacting_table() == &t && task.compaction_type() == sstables::compaction_type::Cleanup;
});
};
if (check_for_cleanup()) {
@@ -2198,11 +2201,11 @@ future<> compaction_manager::remove(table_state& t, sstring reason) noexcept {
auto found = false;
sstring msg;
for (auto& task : _tasks) {
if (task->compacting_table() == &t) {
if (task.compacting_table() == &t) {
if (!msg.empty()) {
msg += "\n";
}
msg += format("Found {} after remove", *task.get());
msg += format("Found {} after remove", task);
found = true;
}
}
@@ -2213,25 +2216,25 @@ future<> compaction_manager::remove(table_state& t, sstring reason) noexcept {
}
const std::vector<sstables::compaction_info> compaction_manager::get_compactions(table_state* t) const {
auto to_info = [] (const shared_ptr<compaction_task_executor>& task) {
auto to_info = [] (const compaction_task_executor& task) {
sstables::compaction_info ret;
ret.compaction_uuid = task->compaction_data().compaction_uuid;
ret.type = task->compaction_type();
ret.ks_name = task->compacting_table()->schema()->ks_name();
ret.cf_name = task->compacting_table()->schema()->cf_name();
ret.total_partitions = task->compaction_data().total_partitions;
ret.total_keys_written = task->compaction_data().total_keys_written;
ret.compaction_uuid = task.compaction_data().compaction_uuid;
ret.type = task.compaction_type();
ret.ks_name = task.compacting_table()->schema()->ks_name();
ret.cf_name = task.compacting_table()->schema()->cf_name();
ret.total_partitions = task.compaction_data().total_partitions;
ret.total_keys_written = task.compaction_data().total_keys_written;
return ret;
};
using ret = std::vector<sstables::compaction_info>;
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const shared_ptr<compaction_task_executor>& task) {
return (!t || task->compacting_table() == t) && task->compaction_running();
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const compaction_task_executor& task) {
return (!t || task.compacting_table() == t) && task.compaction_running();
}) | boost::adaptors::transformed(to_info));
}
bool compaction_manager::has_table_ongoing_compaction(const table_state& t) const {
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr<compaction_task_executor>& task) {
return task->compacting_table() == &t && task->compaction_running();
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const compaction_task_executor& task) {
return task.compacting_table() == &t && task.compaction_running();
});
};
@@ -2269,8 +2272,8 @@ future<> compaction_manager::stop_compaction(sstring type, table_state* table) {
void compaction_manager::propagate_replacement(table_state& t,
const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
for (auto& task : _tasks) {
if (task->compacting_table() == &t && task->compaction_running()) {
task->compaction_data().pending_replacements.push_back({ removed, added });
if (task.compacting_table() == &t && task.compaction_running()) {
task.compaction_data().pending_replacements.push_back({ removed, added });
}
}
}

View File

@@ -87,8 +87,13 @@ public:
private:
shared_ptr<compaction::task_manager_module> _task_manager_module;
using compaction_task_executor_list_type = bi::list<
compaction_task_executor,
bi::base_hook<bi::list_base_hook<bi::link_mode<bi::auto_unlink>>>,
bi::constant_time_size<false>>;
// compaction manager may have N fibers to allow parallel compaction per shard.
std::list<shared_ptr<compaction::compaction_task_executor>> _tasks;
compaction_task_executor_list_type _tasks;
// Possible states in which the compaction manager can be found.
//
@@ -464,7 +469,9 @@ public:
namespace compaction {
class compaction_task_executor : public enable_shared_from_this<compaction_task_executor> {
class compaction_task_executor
: public enable_shared_from_this<compaction_task_executor>
, public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
public:
enum class state {
none, // initial and final state

View File

@@ -5300,7 +5300,11 @@ class scylla_compaction_tasks(gdb.Command):
pass
task_hist = histogram(print_indicators=False)
task_list = list(std_list(cm['_tasks']))
try:
task_list = list(intrusive_list(cm['_tasks']))
except gdb.error: # 6.2 compatibility
task_list = list(std_list(cm['_tasks']))
for task in task_list:
task = seastar_shared_ptr(task).get().dereference()
schema = schema_ptr(task['_compacting_table'].dereference()['_schema'])

View File

@@ -532,11 +532,12 @@ void test_env_compaction_manager::propagate_replacement(compaction::table_state&
// Test version of compaction_manager::perform_compaction<>()
future<> test_env_compaction_manager::perform_compaction(shared_ptr<compaction::compaction_task_executor> task) {
_cm._tasks.push_back(task);
auto unregister_task = defer([this, task] {
if (_cm._tasks.remove(task) == 0) {
_cm._tasks.push_back(*task);
auto unregister_task = defer([task] {
if (!task->is_linked()) {
testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", task->compaction_data().compaction_uuid);
}
task->unlink();
task->switch_state(compaction_task_executor::state::none);
});
co_await task->run_compaction();