/* * Copyright (C) 2023-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include "compaction/compaction.hh" #include "replica/database_fwd.hh" #include "schema/schema_fwd.hh" #include "tasks/task_manager.hh" namespace sstables { class sstable_directory; } namespace replica { class reshard_shard_descriptor; } namespace compaction { class compaction_task_impl : public tasks::task_manager::task::impl { protected: mutable uint64_t _expected_workload = 0; public: compaction_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id) noexcept : tasks::task_manager::task::impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id) { _status.progress_units = "bytes"; } virtual std::string type() const override = 0; virtual tasks::is_abortable is_abortable() const noexcept override; protected: virtual future<> run() override = 0; future get_table_task_workload(replica::database& db, const table_info& ti) const; future get_shard_task_workload(replica::database& db, const std::vector& tables) const; future get_keyspace_task_workload(sharded& db, const std::vector& tables) const; future get_progress(const compaction_data& cdata, const compaction_progress_monitor& progress_monitor) const; }; using current_task_type = shared_ptr; enum class flush_mode { skip, // Skip flushing. Useful when application explicitly flushes all tables prior to compaction compacted_tables, // Flush only the compacted keyspace/tables all_tables // Flush all tables in the database prior to compaction }; class major_compaction_task_impl : public compaction_task_impl { public: major_compaction_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id, flush_mode fm = flush_mode::compacted_tables, bool consider_only_existing_data = false) noexcept : compaction_task_impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id) , _flush_mode(fm) , _consider_only_existing_data(consider_only_existing_data) {} virtual std::string type() const override { return "major compaction"; } protected: flush_mode _flush_mode; bool _consider_only_existing_data; virtual future<> run() override = 0; }; class global_major_compaction_task_impl : public major_compaction_task_impl { private: sharded& _db; public: global_major_compaction_task_impl(tasks::task_manager::module_ptr module, sharded& db, std::optional fm = std::nullopt, bool consider_only_existing_data = false) noexcept : major_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "global", "", "", "", tasks::task_id::create_null_id(), fm.value_or(flush_mode::all_tables), consider_only_existing_data) , _db(db) {} tasks::is_user_task is_user_task() const noexcept override; protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class major_keyspace_compaction_task_impl : public major_compaction_task_impl { private: sharded& _db; std::vector _table_infos; // _cvp and _current_task are engaged when the task is invoked from // global_major_compaction_task_impl seastar::condition_variable* _cv; current_task_type* _current_task; public: major_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, tasks::task_id parent_id, sharded& db, std::vector table_infos, std::optional fm = std::nullopt, bool consider_only_existing_data = false, seastar::condition_variable* cv = nullptr, current_task_type* current_task = nullptr) noexcept : major_compaction_task_impl(module, tasks::task_id::create_random_id(), parent_id ? 0 : module->new_sequence_number(), "keyspace", std::move(keyspace), "", "", parent_id, fm.value_or(flush_mode::all_tables), consider_only_existing_data) , _db(db) , _table_infos(std::move(table_infos)) , _cv(cv) , _current_task(current_task) {} tasks::is_user_task is_user_task() const noexcept override; protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class shard_major_keyspace_compaction_task_impl : public major_compaction_task_impl { private: replica::database& _db; std::vector _local_tables; public: shard_major_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, tasks::task_id parent_id, replica::database& db, std::vector local_tables, flush_mode fm, bool consider_only_existing_data) noexcept : major_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), "", "", parent_id, fm, consider_only_existing_data) , _db(db) , _local_tables(std::move(local_tables)) {} protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class table_major_keyspace_compaction_task_impl : public major_compaction_task_impl { private: replica::database& _db; table_info _ti; seastar::condition_variable& _cv; current_task_type& _current_task; public: table_major_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, tasks::task_id parent_id, replica::database& db, table_info ti, seastar::condition_variable& cv, current_task_type& current_task, flush_mode fm, bool consider_only_existing_data) noexcept : major_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "table", std::move(keyspace), std::move(table), "", parent_id, fm, consider_only_existing_data) , _db(db) , _ti(std::move(ti)) , _cv(cv) , _current_task(current_task) {} protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class cleanup_compaction_task_impl : public compaction_task_impl { public: cleanup_compaction_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id) noexcept : compaction_task_impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id) {} virtual std::string type() const override { return "cleanup compaction"; } protected: virtual future<> run() override = 0; }; class cleanup_keyspace_compaction_task_impl : public cleanup_compaction_task_impl { private: sharded& _db; std::vector _table_infos; const flush_mode _flush_mode; tasks::is_user_task _is_user_task; public: cleanup_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, sharded& db, std::vector table_infos, flush_mode mode, tasks::is_user_task is_user_task) noexcept : cleanup_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "keyspace", std::move(keyspace), "", "", tasks::task_id::create_null_id()) , _db(db) , _table_infos(std::move(table_infos)) , _flush_mode(mode) , _is_user_task(is_user_task) {} tasks::is_user_task is_user_task() const noexcept override; protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class global_cleanup_compaction_task_impl : public compaction_task_impl { private: sharded& _db; public: global_cleanup_compaction_task_impl(tasks::task_manager::module_ptr module, sharded& db) noexcept : compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "global", "", "", "", tasks::task_id::create_null_id()) , _db(db) {} std::string type() const final { return "global cleanup compaction"; } tasks::is_user_task is_user_task() const noexcept override; private: future<> run() final; virtual future> expected_total_workload() const override; }; class shard_cleanup_keyspace_compaction_task_impl : public cleanup_compaction_task_impl { private: replica::database& _db; std::vector _local_tables; public: shard_cleanup_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, tasks::task_id parent_id, replica::database& db, std::vector local_tables) noexcept : cleanup_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), "", "", parent_id) , _db(db) , _local_tables(std::move(local_tables)) {} protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class table_cleanup_keyspace_compaction_task_impl : public cleanup_compaction_task_impl { private: replica::database& _db; table_info _ti; seastar::condition_variable& _cv; current_task_type& _current_task; public: table_cleanup_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, tasks::task_id parent_id, replica::database& db, table_info ti, seastar::condition_variable& cv, current_task_type& current_task) noexcept : cleanup_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "table", std::move(keyspace), std::move(table), "", parent_id) , _db(db) , _ti(std::move(ti)) , _cv(cv) , _current_task(current_task) {} protected: virtual future<> run() override; future> expected_total_workload() const override; }; class offstrategy_compaction_task_impl : public compaction_task_impl { public: offstrategy_compaction_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id) noexcept : compaction_task_impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id) {} virtual std::string type() const override { return "offstrategy compaction"; } protected: virtual future<> run() override = 0; }; class offstrategy_keyspace_compaction_task_impl : public offstrategy_compaction_task_impl { private: sharded& _db; std::vector _table_infos; bool* _needed; public: offstrategy_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, sharded& db, std::vector table_infos, bool* needed) noexcept : offstrategy_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "keyspace", std::move(keyspace), "", "", tasks::task_id::create_null_id()) , _db(db) , _table_infos(std::move(table_infos)) , _needed(needed) {} tasks::is_user_task is_user_task() const noexcept override; protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class shard_offstrategy_keyspace_compaction_task_impl : public offstrategy_compaction_task_impl { private: replica::database& _db; std::vector _table_infos; bool& _needed; public: shard_offstrategy_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, tasks::task_id parent_id, replica::database& db, std::vector table_infos, bool& needed) noexcept : offstrategy_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), "", "", parent_id) , _db(db) , _table_infos(std::move(table_infos)) , _needed(needed) {} protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class table_offstrategy_keyspace_compaction_task_impl : public offstrategy_compaction_task_impl { private: replica::database& _db; table_info _ti; seastar::condition_variable& _cv; current_task_type& _current_task; bool& _needed; public: table_offstrategy_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, tasks::task_id parent_id, replica::database& db, table_info ti, seastar::condition_variable& cv, current_task_type& current_task, bool& needed) noexcept : offstrategy_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "table", std::move(keyspace), std::move(table), "", parent_id) , _db(db) , _ti(std::move(ti)) , _cv(cv) , _current_task(current_task) , _needed(needed) {} protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class sstables_compaction_task_impl : public compaction_task_impl { public: sstables_compaction_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id) noexcept : compaction_task_impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id) {} virtual std::string type() const override { return "sstables compaction"; } protected: virtual future<> run() override = 0; }; class upgrade_sstables_compaction_task_impl : public sstables_compaction_task_impl { private: sharded& _db; std::vector _table_infos; bool _exclude_current_version; public: upgrade_sstables_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, sharded& db, std::vector table_infos, bool exclude_current_version) noexcept : sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "keyspace", std::move(keyspace), "", "", tasks::task_id::create_null_id()) , _db(db) , _table_infos(std::move(table_infos)) , _exclude_current_version(exclude_current_version) {} virtual std::string type() const override { return "upgrade " + sstables_compaction_task_impl::type(); } tasks::is_user_task is_user_task() const noexcept override; protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class shard_upgrade_sstables_compaction_task_impl : public sstables_compaction_task_impl { private: replica::database& _db; std::vector _table_infos; bool _exclude_current_version; public: shard_upgrade_sstables_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, tasks::task_id parent_id, replica::database& db, std::vector table_infos, bool exclude_current_version) noexcept : sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), "", "", parent_id) , _db(db) , _table_infos(std::move(table_infos)) , _exclude_current_version(exclude_current_version) {} virtual std::string type() const override { return "upgrade " + sstables_compaction_task_impl::type(); } protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class table_upgrade_sstables_compaction_task_impl : public sstables_compaction_task_impl { private: replica::database& _db; table_info _ti; seastar::condition_variable& _cv; current_task_type& _current_task; bool _exclude_current_version; public: table_upgrade_sstables_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, tasks::task_id parent_id, replica::database& db, table_info ti, seastar::condition_variable& cv, current_task_type& current_task, bool exclude_current_version) noexcept : sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "table", std::move(keyspace), std::move(table), "", parent_id) , _db(db) , _ti(std::move(ti)) , _cv(cv) , _current_task(current_task) , _exclude_current_version(exclude_current_version) {} virtual std::string type() const override { return "upgrade " + sstables_compaction_task_impl::type(); } protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class scrub_sstables_compaction_task_impl : public sstables_compaction_task_impl { private: sharded& _db; std::vector _column_families; compaction_type_options::scrub _opts; compaction_stats* _stats; public: scrub_sstables_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, sharded& db, std::vector column_families, compaction_type_options::scrub opts, compaction_stats* stats) noexcept : sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "keyspace", std::move(keyspace), "", "", tasks::task_id::create_null_id()) , _db(db) , _column_families(std::move(column_families)) , _opts(opts) , _stats(stats) {} virtual std::string type() const override { return "scrub " + sstables_compaction_task_impl::type(); } tasks::is_user_task is_user_task() const noexcept override; protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class shard_scrub_sstables_compaction_task_impl : public sstables_compaction_task_impl { private: replica::database& _db; std::vector _column_families; compaction_type_options::scrub _opts; compaction_stats& _stats; public: shard_scrub_sstables_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, tasks::task_id parent_id, replica::database& db, std::vector column_families, compaction_type_options::scrub opts, compaction_stats& stats) noexcept : sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), "", "", parent_id) , _db(db) , _column_families(std::move(column_families)) , _opts(opts) , _stats(stats) {} virtual std::string type() const override { return "scrub " + sstables_compaction_task_impl::type(); } protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class table_scrub_sstables_compaction_task_impl : public sstables_compaction_task_impl { private: replica::database& _db; compaction_type_options::scrub _opts; compaction_stats& _stats; public: table_scrub_sstables_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, tasks::task_id parent_id, replica::database& db, compaction_type_options::scrub opts, compaction_stats& stats) noexcept : sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "table", std::move(keyspace), std::move(table), "", parent_id) , _db(db) , _opts(opts) , _stats(stats) {} virtual std::string type() const override { return "scrub " + sstables_compaction_task_impl::type(); } protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class reshaping_compaction_task_impl : public compaction_task_impl { public: reshaping_compaction_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id) noexcept : compaction_task_impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id) {} virtual std::string type() const override { return "reshaping compaction"; } protected: virtual future<> run() override = 0; }; class table_reshaping_compaction_task_impl : public reshaping_compaction_task_impl { private: sharded& _dir; sharded& _db; reshape_mode _mode; compaction_sstable_creator_fn _creator; std::function _filter; public: table_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, sharded& dir, sharded& db, reshape_mode mode, compaction_sstable_creator_fn creator, std::function filter) noexcept : reshaping_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id()) , _dir(dir) , _db(db) , _mode(mode) , _creator(std::move(creator)) , _filter(std::move(filter)) {} protected: virtual future<> run() override; }; class shard_reshaping_compaction_task_impl : public reshaping_compaction_task_impl { private: sstables::sstable_directory& _dir; sharded& _db; reshape_mode _mode; compaction_sstable_creator_fn _creator; std::function _filter; uint64_t& _total_shard_size; future<> reshape_compaction_group(compaction::compaction_group_view& t, std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info); public: shard_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, tasks::task_id parent_id, sstables::sstable_directory& dir, sharded& db, reshape_mode mode, compaction_sstable_creator_fn creator, std::function filter, uint64_t& total_shard_size) noexcept : reshaping_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), std::move(table), "", parent_id) , _dir(dir) , _db(db) , _mode(mode) , _creator(std::move(creator)) , _filter(std::move(filter)) , _total_shard_size(total_shard_size) {} protected: virtual future<> run() override; }; class resharding_compaction_task_impl : public compaction_task_impl { public: resharding_compaction_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id) noexcept : compaction_task_impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id) {} virtual std::string type() const override { return "resharding compaction"; } protected: virtual future<> run() override = 0; }; class table_resharding_compaction_task_impl : public resharding_compaction_task_impl { private: sharded& _dir; sharded& _db; compaction_sstable_creator_fn _creator; compaction::owned_ranges_ptr _owned_ranges_ptr; public: table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, sharded& dir, sharded& db, compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) noexcept : resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id()) , _dir(dir) , _db(db) , _creator(std::move(creator)) , _owned_ranges_ptr(std::move(owned_ranges_ptr)) {} protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class shard_resharding_compaction_task_impl : public resharding_compaction_task_impl { private: sharded& _dir; replica::database& _db; compaction_sstable_creator_fn _creator; compaction::owned_ranges_ptr _local_owned_ranges_ptr; std::vector& _destinations; public: shard_resharding_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, tasks::task_id parent_id, sharded& dir, replica::database& db, compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr local_owned_ranges_ptr, std::vector& destinations) noexcept; protected: virtual future<> run() override; virtual future> expected_total_workload() const override; }; class task_manager_module : public tasks::task_manager::module { public: task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {} }; class regular_compaction_task_impl : public compaction_task_impl { public: regular_compaction_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id) noexcept : compaction_task_impl(module, id, sequence_number, "compaction group", std::move(keyspace), std::move(table), std::move(entity), parent_id) {} virtual std::string type() const override { return "regular compaction"; } virtual tasks::is_internal is_internal() const noexcept override { return tasks::is_internal::yes; } protected: virtual future<> run() override = 0; }; } // namespace compaction template <> struct fmt::formatter { constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } auto format(compaction::flush_mode, fmt::format_context& ctx) const -> decltype(ctx.out()); };