compaction: create task manager's task for rewrite sstables keyspace compaction on one shard

Implementation of task_manager's task that covers rewrite sstables keyspace
compaction on one shard.
This commit is contained in:
Aleksandra Martyniuk
2023-02-02 15:43:12 +01:00
parent c4098df4ec
commit a93f044efa
2 changed files with 39 additions and 5 deletions

View File

@@ -107,11 +107,22 @@ future<> local_offstrategy_keyspace_compaction_task_impl::run() {
future<> upgrade_sstables_compaction_task_impl::run() {
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(_status.keyspace));
co_await run_on_existing_tables("upgrade_sstables", db, _status.keyspace, _table_infos, [&] (replica::table& t) -> future<> {
return t.parallel_foreach_table_state([&] (compaction::table_state& ts) -> future<> {
return t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version);
});
tasks::task_info parent_info{_status.id, _status.shard};
auto& compaction_module = db.get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<shard_upgrade_sstables_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _table_infos, _exclude_current_version);
co_await task->done();
});
}
tasks::is_internal shard_upgrade_sstables_compaction_task_impl::is_internal() const noexcept {
return tasks::is_internal::yes;
}
future<> shard_upgrade_sstables_compaction_task_impl::run() {
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(_db.get_keyspace_local_ranges(_status.keyspace));
co_await run_on_existing_tables("upgrade_sstables", _db, _status.keyspace, _table_infos, [&] (replica::table& t) -> future<> {
return t.parallel_foreach_table_state([&] (compaction::table_state& ts) -> future<> {
return t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version);
});
});
}

View File

@@ -254,6 +254,29 @@ protected:
virtual future<> run() override;
};
class shard_upgrade_sstables_compaction_task_impl : public rewrite_sstables_compaction_task_impl {
private:
replica::database& _db;
std::vector<table_id> _table_infos;
bool _exclude_current_version;
public:
shard_upgrade_sstables_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
tasks::task_id parent_id,
replica::database& db,
std::vector<table_id> table_infos,
bool exclude_current_version) noexcept
: rewrite_sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), "", "", parent_id)
, _db(db)
, _table_infos(std::move(table_infos))
, _exclude_current_version(exclude_current_version)
{}
virtual tasks::is_internal is_internal() const noexcept override;
protected:
virtual future<> run() override;
};
class task_manager_module : public tasks::task_manager::module {
public:
task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {}