diff --git a/repair/repair.cc b/repair/repair.cc index a7d2af8741..e35702759d 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -346,12 +346,12 @@ float node_ops_metrics::repair_finished_percentage() { repair_module::repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept : tasks::task_manager::module(tm, "repair") , _rs(rs) - , _range_parallelism_semaphore(std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range() / 4)), + , _range_parallelism_semaphore(std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range / 4)), named_semaphore_exception_factory{"repair range parallelism"}) { auto nr = _range_parallelism_semaphore.available_units(); rlogger.info("Setting max_repair_memory={}, max_repair_memory_per_range={}, max_repair_ranges_in_parallel={}", - max_repair_memory, max_repair_memory_per_range(), nr); + max_repair_memory, max_repair_memory_per_range, nr); } void repair_module::start(repair_uniq_id id) { @@ -916,6 +916,11 @@ private: } }; +static future start_repair_task(tasks::task_manager::task::task_impl_ptr task_impl_ptr, shared_ptr module, tasks::task_info pd = {}) { + auto task = co_await module->make_task(std::move(task_impl_ptr), pd); + task->start(); + co_return task; +} static future<> do_repair_ranges(lw_shared_ptr ri) { // Repair tables in the keyspace one after another @@ -1097,9 +1102,20 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map co_return id.id; } - // Do it in the background. - (void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace), germs = std::move(germs), - cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable { + auto task_impl_ptr = std::make_unique(_repair_module, id, std::move(keyspace), format("{}", streaming::stream_reason::repair), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes)); + auto task = co_await start_repair_task(std::move(task_impl_ptr), _repair_module); + co_return id.id; +} + +future<> user_requested_repair_task_impl::run() { + auto module = dynamic_pointer_cast(_module); + auto& rs = module->get_repair_service(); + auto& sharded_db = rs.get_db(); + auto& db = sharded_db.local(); + auto id = get_repair_uniq_id(); + + return module->run(id, [this, &rs, &db, id, keyspace = _status.keyspace, germs = std::move(_germs), + cfs = std::move(_cfs), ranges = std::move(_ranges), hosts = std::move(_hosts), data_centers = std::move(_data_centers), ignore_nodes = std::move(_ignore_nodes)] () mutable { auto uuid = id.uuid(); bool needs_flush_before_repair = false; @@ -1114,7 +1130,7 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map } bool hints_batchlog_flushed = false; - auto participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get(); + auto participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get(); if (needs_flush_before_repair) { auto waiting_nodes = db.get_token_metadata().get_all_endpoints(); std::erase_if(waiting_nodes, [&] (const auto& addr) { @@ -1125,11 +1141,11 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; try { - parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> { + parallel_for_each(waiting_nodes, [this, &rs, uuid, &req, &participants] (gms::inet_address node) -> future<> { rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", uuid, node, participants); try { - auto& ms = get_messaging(); + auto& ms = rs.get_messaging(); auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); (void)resp; // nothing to do with response yet } catch (...) { @@ -1151,14 +1167,14 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map repair_results.reserve(smp::count); auto table_ids = get_table_ids(db, keyspace, cfs); abort_source as; - auto off_strategy_updater = seastar::async([this, uuid = uuid.uuid(), &table_ids, &participants, &as] { + auto off_strategy_updater = seastar::async([this, &rs, uuid = uuid.uuid(), &table_ids, &participants, &as] { auto tables = std::list(table_ids.begin(), table_ids.end()); auto req = node_ops_cmd_request(node_ops_cmd::repair_updater, uuid, {}, {}, {}, {}, std::move(tables)); auto update_interval = std::chrono::seconds(30); while (!as.abort_requested()) { sleep_abortable(update_interval, as).get(); - parallel_for_each(participants, [this, uuid, &req] (gms::inet_address node) { - return _messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + parallel_for_each(participants, [this, &rs, uuid, &req] (gms::inet_address node) { + return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node); }).handle_exception([uuid, node] (std::exception_ptr ep) { rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node); @@ -1180,21 +1196,21 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map rlogger.info("repair[{}]: Finished to shutdown off-strategy compaction updater", uuid); }); - auto cleanup_repair_range_history = defer([this, uuid] () mutable { + auto cleanup_repair_range_history = defer([this, &rs, uuid] () mutable { try { - this->cleanup_history(uuid).get(); + rs.cleanup_history(uuid).get(); } catch (...) { rlogger.warn("repair[{}]: Failed to cleanup history: {}", uuid, std::current_exception()); } }); - if (get_repair_module().is_aborted(id.uuid())) { + if (rs.get_repair_module().is_aborted(id.uuid())) { throw std::runtime_error("aborted by user request"); } for (auto shard : boost::irange(unsigned(0), smp::count)) { - auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, - data_centers = options.data_centers, hosts = options.hosts, ignore_nodes, germs] (repair_service& local_repair) mutable { + auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, + data_centers, hosts, ignore_nodes, germs] (repair_service& local_repair) mutable { local_repair.get_metrics().repair_total_ranges_sum += ranges.size(); auto ri = make_lw_shared(local_repair, std::move(keyspace), germs->get().shared_from_this(), std::move(ranges), std::move(table_ids), @@ -1219,9 +1235,8 @@ future repair_service::do_repair_start(sstring keyspace, std::unordered_map }).get(); }).handle_exception([id] (std::exception_ptr ep) { rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid(), ep); + return make_exception_future<>(ep); }); - - co_return id.id; } future repair_start(seastar::sharded& repair, @@ -1271,13 +1286,23 @@ future<> repair_service::sync_data_using_repair( } assert(this_shard_id() == 0); - auto& sharded_db = get_db(); + auto task_impl_ptr = std::make_unique(_repair_module, _repair_module->new_repair_uniq_id(), std::move(keyspace), format("{}", reason), "", std::move(ranges), std::move(neighbors), reason, ops_info); + auto task = co_await start_repair_task(std::move(task_impl_ptr), _repair_module); + task->start(); + co_await task->done(); +} + +future<> data_sync_repair_task_impl::run() { + auto module = dynamic_pointer_cast(_module); + auto& rs = module->get_repair_service(); + auto& keyspace = _status.keyspace; + auto& sharded_db = rs.get_db(); auto& db = sharded_db.local(); auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace)); - repair_uniq_id id = get_repair_module().new_repair_uniq_id(); + auto id = get_repair_uniq_id(); rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace); - co_await get_repair_module().run(id, [this, id, &db, keyspace, germs = std::move(germs), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable { + co_await module->run(id, [this, &rs, id, &db, keyspace, germs = std::move(germs), &ranges = _ranges, &neighbors = _neighbors, reason = _reason, ops_info = _ops_info] () mutable { auto cfs = list_column_families(db, keyspace); if (cfs.empty()) { rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace); @@ -1286,11 +1311,11 @@ future<> repair_service::sync_data_using_repair( auto table_ids = get_table_ids(db, keyspace, cfs); std::vector> repair_results; repair_results.reserve(smp::count); - if (get_repair_module().is_aborted(id.uuid())) { + if (rs.get_repair_module().is_aborted(id.uuid())) { throw std::runtime_error("aborted by user request"); } for (auto shard : boost::irange(unsigned(0), smp::count)) { - auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info, germs] (repair_service& local_repair) mutable { + auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info, germs] (repair_service& local_repair) mutable { auto data_centers = std::vector(); auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); diff --git a/repair/repair_task.hh b/repair/repair_task.hh index 13f7c25d6c..6192ee9033 100644 --- a/repair/repair_task.hh +++ b/repair/repair_task.hh @@ -13,7 +13,7 @@ class repair_task_impl : public tasks::task_manager::task::impl { public: - repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, tasks::task_id parent_id) + repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, tasks::task_id parent_id) noexcept : tasks::task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id) { _status.progress_units = "ranges"; } @@ -28,6 +28,50 @@ protected: virtual future<> run() override = 0; }; +class user_requested_repair_task_impl : public repair_task_impl { +private: + lw_shared_ptr _germs; + std::vector _cfs; + dht::token_range_vector _ranges; + std::vector _hosts; + std::vector _data_centers; + std::unordered_set _ignore_nodes; +public: + user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string type, std::string entity, lw_shared_ptr germs, std::vector cfs, dht::token_range_vector ranges, std::vector hosts, std::vector data_centers, std::unordered_set ignore_nodes) noexcept + : repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(type), std::move(entity), tasks::task_id::create_null_id()) + , _germs(germs) + , _cfs(std::move(cfs)) + , _ranges(std::move(ranges)) + , _hosts(std::move(hosts)) + , _data_centers(std::move(data_centers)) + , _ignore_nodes(std::move(ignore_nodes)) + {} +protected: + future<> run() override; + + // TODO: implement progress for user-requested repairs +}; + +class data_sync_repair_task_impl : public repair_task_impl { +private: + dht::token_range_vector _ranges; + std::unordered_map _neighbors; + streaming::stream_reason _reason; + shared_ptr _ops_info; +public: + data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string type, std::string entity, dht::token_range_vector ranges, std::unordered_map neighbors, streaming::stream_reason reason, shared_ptr ops_info) + : repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(type), std::move(entity), tasks::task_id::create_null_id()) + , _ranges(std::move(ranges)) + , _neighbors(std::move(neighbors)) + , _reason(reason) + , _ops_info(ops_info) + {} +protected: + future<> run() override; + + // TODO: implement progress for data-sync repairs +}; + // The repair_module tracks ongoing repair operations and their progress. // A repair which has already finished successfully is dropped from this // table, but a failed repair will remain in the table forever so it can @@ -47,11 +91,12 @@ private: // The semaphore used to control the maximum // ranges that can be repaired in parallel. named_semaphore _range_parallelism_semaphore; - static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024; seastar::condition_variable _done_cond; void start(repair_uniq_id id); void done(repair_uniq_id id, bool succeeded); public: + static constexpr size_t max_repair_memory_per_range = 32 * 1024 * 1024; + repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept; repair_service& get_repair_service() noexcept { @@ -74,7 +119,6 @@ public: size_t nr_running_repair_jobs(); void abort_all_repairs(); named_semaphore& range_parallelism_semaphore(); - static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; } future<> run(repair_uniq_id id, std::function func); future repair_await_completion(int id, std::chrono::steady_clock::time_point timeout); float report_progress(streaming::stream_reason reason); diff --git a/repair/row_level.cc b/repair/row_level.cc index c6161d019e..e3fed91e87 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2449,7 +2449,7 @@ private: size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) { // Max buffer size per repair round - return is_rpc_stream_supported(algo) ? repair_module::max_repair_memory_per_range() : 256 * 1024; + return is_rpc_stream_supported(algo) ? repair_module::max_repair_memory_per_range : 256 * 1024; } // Step A: Negotiate sync boundary to use @@ -2733,7 +2733,7 @@ public: auto& mem_sem = _ri.rs.memory_sem(); auto max = _ri.rs.max_repair_memory(); - auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range(); + auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range; wanted = std::min(max, wanted); rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}", _ri.id.uuid(), wanted, mem_sem.current(), max); diff --git a/repair/row_level.hh b/repair/row_level.hh index cf744e1d39..7a0f559a14 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -227,6 +227,9 @@ public: future<> remove_repair_meta(); future get_next_repair_meta_id(); + + friend class user_requested_repair_task_impl; + friend class data_sync_repair_task_impl; }; class repair_info; diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index 37370ffb2b..fb637f1a9f 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -98,7 +98,7 @@ public: module_ptr _module; abort_source _as; public: - impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id) + impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id) noexcept : _status({ .id = id, .type = std::move(type), diff --git a/tasks/test_module.hh b/tasks/test_module.hh index e29e94e0d5..4cd9d87cd4 100644 --- a/tasks/test_module.hh +++ b/tasks/test_module.hh @@ -35,7 +35,7 @@ private: promise<> _finish_run; bool _finished = false; public: - test_task_impl(task_manager::module_ptr module, task_id id, uint64_t sequence_number = 0, std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_id parent_id = task_id::create_null_id()) + test_task_impl(task_manager::module_ptr module, task_id id, uint64_t sequence_number = 0, std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_id parent_id = task_id::create_null_id()) noexcept : task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id) {}