/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include "node_ops/node_ops_ctl.hh" #include "repair/repair.hh" #include "service/topology_guard.hh" #include "streaming/stream_reason.hh" #include "tasks/task_manager.hh" namespace repair { class repair_task_impl : public tasks::task_manager::task::impl { protected: streaming::stream_reason _reason; public: repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id, streaming::stream_reason reason) noexcept : tasks::task_manager::task::impl(module, id, sequence_number, std::move(scope), std::move(keyspace), std::move(table), std::move(entity), parent_id) , _reason(reason) { _status.progress_units = "ranges"; } virtual std::string type() const override { return format("{}", _reason); } protected: repair_uniq_id get_repair_uniq_id() const noexcept { return repair_uniq_id{ .id = _status.sequence_number, .task_info = tasks::task_info(_status.id, _status.shard) }; } virtual future<> run() override = 0; }; class user_requested_repair_task_impl : public repair_task_impl { private: lw_shared_ptr _germs; std::vector _cfs; dht::token_range_vector _ranges; std::vector _hosts; std::vector _data_centers; std::unordered_set _ignore_nodes; bool _small_table_optimization; std::optional _ranges_parallelism; gms::gossiper& _gossiper; public: user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr germs, std::vector cfs, dht::token_range_vector ranges, std::vector hosts, std::vector data_centers, std::unordered_set ignore_nodes, bool small_table_optimization, std::optional ranges_parallelism, gms::gossiper& gossiper) noexcept : repair_task_impl(module, id.uuid(), id.id, "keyspace", std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair) , _germs(germs) , _cfs(std::move(cfs)) , _ranges(std::move(ranges)) , _hosts(std::move(hosts)) , _data_centers(std::move(data_centers)) , _ignore_nodes(std::move(ignore_nodes)) , _small_table_optimization(small_table_optimization) , _ranges_parallelism(ranges_parallelism) , _gossiper(gossiper) {} virtual tasks::is_abortable is_abortable() const noexcept override { return tasks::is_abortable::yes; } tasks::is_user_task is_user_task() const noexcept override; protected: future<> run() override; virtual future> expected_total_workload() const override; }; class data_sync_repair_task_impl : public repair_task_impl { private: dht::token_range_vector _ranges; std::unordered_map _neighbors; optimized_optional _abort_subscription; size_t _cfs_size = 0; public: data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, dht::token_range_vector ranges, std::unordered_map neighbors, streaming::stream_reason reason, shared_ptr ops_info) : repair_task_impl(module, id.uuid(), id.id, "keyspace", std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), reason) , _ranges(std::move(ranges)) , _neighbors(std::move(neighbors)) { if (ops_info && ops_info->as) { _abort_subscription = ops_info->as->subscribe([this] () noexcept { abort(); }); } } virtual tasks::is_abortable is_abortable() const noexcept override { return tasks::is_abortable(!_abort_subscription); } protected: future<> run() override; virtual future> expected_total_workload() const override; }; class tablet_repair_task_impl : public repair_task_impl { private: sstring _keyspace; std::vector _tables; std::vector _metas; optimized_optional _abort_subscription; std::optional _ranges_parallelism; size_t _metas_size = 0; gc_clock::time_point _flush_time = gc_clock::time_point(); bool _should_flush_and_flush_failed = false; service::frozen_topology_guard _topo_guard; bool _skip_flush; public: tablet_repair_sched_info sched_info; public: tablet_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, sstring keyspace, tasks::task_id parent_id, std::vector tables, streaming::stream_reason reason, std::vector metas, std::optional ranges_parallelism, service::frozen_topology_guard topo_guard, tablet_repair_sched_info sched_info, bool skip_flush = false) : repair_task_impl(module, id.uuid(), id.id, "keyspace", keyspace, "", "", parent_id, reason) , _keyspace(std::move(keyspace)) , _tables(std::move(tables)) , _metas(std::move(metas)) , _ranges_parallelism(ranges_parallelism) , _topo_guard(topo_guard) , _skip_flush(skip_flush) , sched_info(std::move(sched_info)) { } virtual tasks::is_abortable is_abortable() const noexcept override { return tasks::is_abortable(!_abort_subscription); } gc_clock::time_point get_flush_time() const { if (_should_flush_and_flush_failed) { throw std::runtime_error(fmt::format("Flush is needed for repair {} with parent {}, but failed", id(), _parent_id)); } return _flush_time; } tasks::is_user_task is_user_task() const noexcept override; virtual future<> release_resources() noexcept override; private: size_t get_metas_size() const noexcept; protected: future<> run() override; virtual future> expected_total_workload() const override; }; class shard_repair_task_impl : public repair_task_impl { public: repair_service& rs; seastar::sharded& db; seastar::sharded& messaging; service::migration_manager& mm; gms::gossiper& gossiper; private: locator::effective_replication_map_ptr erm; public: dht::token_range_vector ranges; std::vector cfs; std::vector table_ids; repair_uniq_id global_repair_id; std::vector data_centers; std::vector hosts; std::unordered_set ignore_nodes; std::unordered_map neighbors; uint64_t nr_ranges_finished = 0; size_t nr_failed_ranges = 0; int ranges_index = 0; repair_stats _stats; std::unordered_set dropped_tables; bool _hints_batchlog_flushed = false; std::unordered_set nodes_down; bool _small_table_optimization = false; size_t small_table_optimization_ranges_reduced_factor = 1; private: bool _aborted = false; std::optional _failed_because; std::optional _user_ranges_parallelism; uint64_t _ranges_complete = 0; gc_clock::time_point _flush_time; service::frozen_topology_guard _frozen_topology_guard; service::topology_guard _topology_guard = {service::null_topology_guard}; public: tablet_repair_sched_info sched_info; public: shard_repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, sstring keyspace, repair_service& repair, locator::effective_replication_map_ptr erm_, dht::token_range_vector ranges_, std::vector table_ids_, repair_uniq_id parent_id_, std::vector data_centers_, std::vector hosts_, std::unordered_set ignore_nodes_, std::unordered_map neighbors_, streaming::stream_reason reason_, bool hints_batchlog_flushed, bool small_table_optimization, std::optional ranges_parallelism, gc_clock::time_point flush_time, service::frozen_topology_guard topo_guard, tablet_repair_sched_info sched_info = tablet_repair_sched_info(), size_t small_table_optimization_ranges_reduced_factor_ = 1); void check_failed_ranges(); void check_in_abort_or_shutdown(); repair_neighbors get_repair_neighbors(const dht::token_range& range); gc_clock::time_point get_flush_time() const { return _flush_time; } void update_statistics(const repair_stats& stats) { _stats.add(stats); } const std::vector& table_names() { return cfs; } const std::string& get_keyspace() const noexcept { return _status.keyspace; } streaming::stream_reason reason() const noexcept { return _reason; } bool hints_batchlog_flushed() const { return _hints_batchlog_flushed; } locator::effective_replication_map_ptr get_erm(); size_t get_total_rf() { return get_erm()->get_replication_factor(); } future<> repair_range(const dht::token_range& range, table_info table); size_t ranges_size() const noexcept; virtual future<> release_resources() noexcept override; protected: future<> do_repair_ranges(); virtual future get_progress() const override; future<> run() override; }; // The repair::task_manager_module tracks ongoing repair operations and their progress. // A repair which has already finished successfully is dropped from this // table, but a failed repair will remain in the table forever so it can // be queried about more than once (FIXME: reconsider this. But note that // failed repairs should be rare anwyay). class task_manager_module : public tasks::task_manager::module { private: repair_service& _rs; // Note that there are no "SUCCESSFUL" entries in the "status" map: // Successfully-finished repairs are those with id <= repair_module::_sequence_number // but aren't listed as running or failed the status map. std::unordered_map _status; // Map repair id into repair_info. std::unordered_map _repairs; std::unordered_set _pending_repairs; // The semaphore used to control the maximum // ranges that can be repaired in parallel. named_semaphore _range_parallelism_semaphore; seastar::condition_variable _done_cond; void start(repair_uniq_id id); void done(repair_uniq_id id, bool succeeded); public: static constexpr size_t max_repair_memory_per_range = 32 * 1024 * 1024; task_manager_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept; repair_service& get_repair_service() noexcept { return _rs; } repair_uniq_id new_repair_uniq_id() noexcept { return repair_uniq_id{ .id = new_sequence_number(), .task_info = tasks::task_info(tasks::task_id::create_random_id(), this_shard_id()) }; } repair_status get(int id) const; void check_in_shutdown(); void add_shard_task_id(int id, tasks::task_id ri); void remove_shard_task_id(int id); tasks::task_manager::task_ptr get_shard_task_ptr(int id); std::vector get_active() const; size_t nr_running_repair_jobs(); void abort_all_repairs(); named_semaphore& range_parallelism_semaphore(); future<> run(repair_uniq_id id, std::function func); future repair_await_completion(int id, std::chrono::steady_clock::time_point timeout); float report_progress(); future is_aborted(const tasks::task_id& uuid, shard_id shard); }; }