mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-29 20:57:00 +00:00
compaction/compaction_manager: make _tasks an intrusive list
_tasks is currently std::list<shared_ptr<compaction_task_executor>>, but it has no role in keeping the instances alive, this is done by the fibers which create the task (and pin a shared ptr instance). This lends itself to an intrusive list, avoiding that extra allocation upon push_back(). Using an intrusive list also makes it simpler and much cheaper (O(1) vs. O(N)) to remove tasks from the _tasks list. This will be made use of in the next patch. Code using _task has to be updated because the value_type changes from shared_ptr<compaction_task_executor> to compaction_task_executor&.
This commit is contained in:
committed by
Benny Halevy
parent
39b55bd3a0
commit
e942c074f2
@@ -188,7 +188,7 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const {
|
||||
return 0;
|
||||
}
|
||||
auto largest_fan_in = std::ranges::max(_tasks | boost::adaptors::transformed([] (auto& task) {
|
||||
return task->compaction_running() ? task->compaction_data().compaction_fan_in : 0;
|
||||
return task.compaction_running() ? task.compaction_data().compaction_fan_in : 0;
|
||||
}));
|
||||
// conservatively limit fan-in threshold to 32, such that tons of small sstables won't accumulate if
|
||||
// running major on a leveled table, which can even have more than one thousand files.
|
||||
@@ -599,9 +599,9 @@ requires (compaction_manager& cm, throw_if_stopping do_throw_if_stopping, Args&&
|
||||
}
|
||||
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args) {
|
||||
auto task_executor = seastar::make_shared<TaskExecutor>(*this, do_throw_if_stopping, std::forward<Args>(args)...);
|
||||
_tasks.push_back(task_executor);
|
||||
auto unregister_task = defer([this, task_executor] {
|
||||
_tasks.remove(task_executor);
|
||||
_tasks.push_back(*task_executor);
|
||||
auto unregister_task = defer([task_executor] {
|
||||
task_executor->unlink();
|
||||
task_executor->switch_state(compaction_task_executor::state::none);
|
||||
});
|
||||
|
||||
@@ -903,10 +903,10 @@ public:
|
||||
explicit strategy_control(compaction_manager& cm) noexcept : _cm(cm) {}
|
||||
|
||||
bool has_ongoing_compaction(table_state& table_s) const noexcept override {
|
||||
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr<compaction_task_executor>& task) {
|
||||
return task->compaction_running()
|
||||
&& task->compacting_table()->schema()->ks_name() == s->ks_name()
|
||||
&& task->compacting_table()->schema()->cf_name() == s->cf_name();
|
||||
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const compaction_task_executor& task) {
|
||||
return task.compaction_running()
|
||||
&& task.compacting_table()->schema()->ks_name() == s->ks_name()
|
||||
&& task.compacting_table()->schema()->cf_name() == s->cf_name();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1092,9 +1092,12 @@ future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_e
|
||||
future<> compaction_manager::stop_ongoing_compactions(sstring reason, table_state* t, std::optional<sstables::compaction_type> type_opt) noexcept {
|
||||
try {
|
||||
auto ongoing_compactions = get_compactions(t).size();
|
||||
auto tasks = boost::copy_range<std::vector<shared_ptr<compaction_task_executor>>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) {
|
||||
return (!t || task->compacting_table() == t) && (!type_opt || task->compaction_type() == *type_opt);
|
||||
}));
|
||||
auto tasks = _tasks
|
||||
| std::views::filter([t, type_opt] (const auto& task) {
|
||||
return (!t || task.compacting_table() == t) && (!type_opt || task.compaction_type() == *type_opt);
|
||||
})
|
||||
| std::views::transform([] (auto& task) { return task.shared_from_this(); })
|
||||
| std::ranges::to<std::vector<shared_ptr<compaction_task_executor>>>();
|
||||
logging::log_level level = tasks.empty() ? log_level::debug : log_level::info;
|
||||
if (cmlog.is_enabled(level)) {
|
||||
std::string scope = "";
|
||||
@@ -2018,7 +2021,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range
|
||||
future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t, tasks::task_info info) {
|
||||
auto check_for_cleanup = [this, &t] {
|
||||
return boost::algorithm::any_of(_tasks, [&t] (auto& task) {
|
||||
return task->compacting_table() == &t && task->compaction_type() == sstables::compaction_type::Cleanup;
|
||||
return task.compacting_table() == &t && task.compaction_type() == sstables::compaction_type::Cleanup;
|
||||
});
|
||||
};
|
||||
if (check_for_cleanup()) {
|
||||
@@ -2198,11 +2201,11 @@ future<> compaction_manager::remove(table_state& t, sstring reason) noexcept {
|
||||
auto found = false;
|
||||
sstring msg;
|
||||
for (auto& task : _tasks) {
|
||||
if (task->compacting_table() == &t) {
|
||||
if (task.compacting_table() == &t) {
|
||||
if (!msg.empty()) {
|
||||
msg += "\n";
|
||||
}
|
||||
msg += format("Found {} after remove", *task.get());
|
||||
msg += format("Found {} after remove", task);
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
@@ -2213,25 +2216,25 @@ future<> compaction_manager::remove(table_state& t, sstring reason) noexcept {
|
||||
}
|
||||
|
||||
const std::vector<sstables::compaction_info> compaction_manager::get_compactions(table_state* t) const {
|
||||
auto to_info = [] (const shared_ptr<compaction_task_executor>& task) {
|
||||
auto to_info = [] (const compaction_task_executor& task) {
|
||||
sstables::compaction_info ret;
|
||||
ret.compaction_uuid = task->compaction_data().compaction_uuid;
|
||||
ret.type = task->compaction_type();
|
||||
ret.ks_name = task->compacting_table()->schema()->ks_name();
|
||||
ret.cf_name = task->compacting_table()->schema()->cf_name();
|
||||
ret.total_partitions = task->compaction_data().total_partitions;
|
||||
ret.total_keys_written = task->compaction_data().total_keys_written;
|
||||
ret.compaction_uuid = task.compaction_data().compaction_uuid;
|
||||
ret.type = task.compaction_type();
|
||||
ret.ks_name = task.compacting_table()->schema()->ks_name();
|
||||
ret.cf_name = task.compacting_table()->schema()->cf_name();
|
||||
ret.total_partitions = task.compaction_data().total_partitions;
|
||||
ret.total_keys_written = task.compaction_data().total_keys_written;
|
||||
return ret;
|
||||
};
|
||||
using ret = std::vector<sstables::compaction_info>;
|
||||
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const shared_ptr<compaction_task_executor>& task) {
|
||||
return (!t || task->compacting_table() == t) && task->compaction_running();
|
||||
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const compaction_task_executor& task) {
|
||||
return (!t || task.compacting_table() == t) && task.compaction_running();
|
||||
}) | boost::adaptors::transformed(to_info));
|
||||
}
|
||||
|
||||
bool compaction_manager::has_table_ongoing_compaction(const table_state& t) const {
|
||||
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr<compaction_task_executor>& task) {
|
||||
return task->compacting_table() == &t && task->compaction_running();
|
||||
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const compaction_task_executor& task) {
|
||||
return task.compacting_table() == &t && task.compaction_running();
|
||||
});
|
||||
};
|
||||
|
||||
@@ -2269,8 +2272,8 @@ future<> compaction_manager::stop_compaction(sstring type, table_state* table) {
|
||||
void compaction_manager::propagate_replacement(table_state& t,
|
||||
const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
|
||||
for (auto& task : _tasks) {
|
||||
if (task->compacting_table() == &t && task->compaction_running()) {
|
||||
task->compaction_data().pending_replacements.push_back({ removed, added });
|
||||
if (task.compacting_table() == &t && task.compaction_running()) {
|
||||
task.compaction_data().pending_replacements.push_back({ removed, added });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,8 +87,13 @@ public:
|
||||
|
||||
private:
|
||||
shared_ptr<compaction::task_manager_module> _task_manager_module;
|
||||
|
||||
using compaction_task_executor_list_type = bi::list<
|
||||
compaction_task_executor,
|
||||
bi::base_hook<bi::list_base_hook<bi::link_mode<bi::auto_unlink>>>,
|
||||
bi::constant_time_size<false>>;
|
||||
// compaction manager may have N fibers to allow parallel compaction per shard.
|
||||
std::list<shared_ptr<compaction::compaction_task_executor>> _tasks;
|
||||
compaction_task_executor_list_type _tasks;
|
||||
|
||||
// Possible states in which the compaction manager can be found.
|
||||
//
|
||||
@@ -464,7 +469,9 @@ public:
|
||||
|
||||
namespace compaction {
|
||||
|
||||
class compaction_task_executor : public enable_shared_from_this<compaction_task_executor> {
|
||||
class compaction_task_executor
|
||||
: public enable_shared_from_this<compaction_task_executor>
|
||||
, public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
|
||||
public:
|
||||
enum class state {
|
||||
none, // initial and final state
|
||||
|
||||
@@ -5300,7 +5300,11 @@ class scylla_compaction_tasks(gdb.Command):
|
||||
pass
|
||||
task_hist = histogram(print_indicators=False)
|
||||
|
||||
task_list = list(std_list(cm['_tasks']))
|
||||
try:
|
||||
task_list = list(intrusive_list(cm['_tasks']))
|
||||
except gdb.error: # 6.2 compatibility
|
||||
task_list = list(std_list(cm['_tasks']))
|
||||
|
||||
for task in task_list:
|
||||
task = seastar_shared_ptr(task).get().dereference()
|
||||
schema = schema_ptr(task['_compacting_table'].dereference()['_schema'])
|
||||
|
||||
@@ -532,11 +532,12 @@ void test_env_compaction_manager::propagate_replacement(compaction::table_state&
|
||||
|
||||
// Test version of compaction_manager::perform_compaction<>()
|
||||
future<> test_env_compaction_manager::perform_compaction(shared_ptr<compaction::compaction_task_executor> task) {
|
||||
_cm._tasks.push_back(task);
|
||||
auto unregister_task = defer([this, task] {
|
||||
if (_cm._tasks.remove(task) == 0) {
|
||||
_cm._tasks.push_back(*task);
|
||||
auto unregister_task = defer([task] {
|
||||
if (!task->is_linked()) {
|
||||
testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", task->compaction_data().compaction_uuid);
|
||||
}
|
||||
task->unlink();
|
||||
task->switch_state(compaction_task_executor::state::none);
|
||||
});
|
||||
co_await task->run_compaction();
|
||||
|
||||
Reference in New Issue
Block a user