From a93f044efa646b784798ef91cd88e13d7df6d110 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Thu, 2 Feb 2023 15:43:12 +0100 Subject: [PATCH] compaction: create task manager's task for rewrite sstables keyspace compaction on one shard Implementation of task_manager's task that covers rewrite sstables keyspace compaction on one shard. --- compaction/task_manager_module.cc | 21 ++++++++++++++++----- compaction/task_manager_module.hh | 23 +++++++++++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index c11aa97364..6a6286b130 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -107,11 +107,22 @@ future<> local_offstrategy_keyspace_compaction_task_impl::run() { future<> upgrade_sstables_compaction_task_impl::run() { co_await _db.invoke_on_all([&] (replica::database& db) -> future<> { - auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(_status.keyspace)); - co_await run_on_existing_tables("upgrade_sstables", db, _status.keyspace, _table_infos, [&] (replica::table& t) -> future<> { - return t.parallel_foreach_table_state([&] (compaction::table_state& ts) -> future<> { - return t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version); - }); + tasks::task_info parent_info{_status.id, _status.shard}; + auto& compaction_module = db.get_compaction_manager().get_task_manager_module(); + auto task = co_await compaction_module.make_and_start_task(parent_info, _status.keyspace, _status.id, db, _table_infos, _exclude_current_version); + co_await task->done(); + }); +} + +tasks::is_internal shard_upgrade_sstables_compaction_task_impl::is_internal() const noexcept { + return tasks::is_internal::yes; +} + +future<> shard_upgrade_sstables_compaction_task_impl::run() { + auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(_db.get_keyspace_local_ranges(_status.keyspace)); + co_await run_on_existing_tables("upgrade_sstables", _db, _status.keyspace, _table_infos, [&] (replica::table& t) -> future<> { + return t.parallel_foreach_table_state([&] (compaction::table_state& ts) -> future<> { + return t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version); }); }); } diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index e838cf52db..5dda87e869 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -254,6 +254,29 @@ protected: virtual future<> run() override; }; +class shard_upgrade_sstables_compaction_task_impl : public rewrite_sstables_compaction_task_impl { +private: + replica::database& _db; + std::vector _table_infos; + bool _exclude_current_version; +public: + shard_upgrade_sstables_compaction_task_impl(tasks::task_manager::module_ptr module, + std::string keyspace, + tasks::task_id parent_id, + replica::database& db, + std::vector table_infos, + bool exclude_current_version) noexcept + : rewrite_sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), "", "", parent_id) + , _db(db) + , _table_infos(std::move(table_infos)) + , _exclude_current_version(exclude_current_version) + {} + + virtual tasks::is_internal is_internal() const noexcept override; +protected: + virtual future<> run() override; +}; + class task_manager_module : public tasks::task_manager::module { public: task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {}