/* * Copyright (C) 2024-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "db/system_keyspace.hh" #include "node_ops/task_manager_module.hh" #include "service/storage_service.hh" #include "service/topology_coordinator.hh" #include "service/topology_state_machine.hh" #include "tasks/task_handler.hh" #include "tasks/virtual_task_hint.hh" #include "utils/error_injection.hh" #include #include "utils/overloaded_functor.hh" using namespace std::chrono_literals; namespace node_ops { static sstring request_type_to_task_type(const std::variant& request_type) { return std::visit(overloaded_functor { [] (const service::topology_request& type) -> sstring { switch (type) { case service::topology_request::join: return "bootstrap"; case service::topology_request::remove: return "remove node"; case service::topology_request::leave: return "decommission"; default: return fmt::to_string(type); } }, [] (const service::global_topology_request type) -> sstring { return fmt::to_string(type); }, [] (const std::monostate type) -> sstring { return ""; } }, request_type); } static tasks::task_manager::task_state get_state(const db::system_keyspace::topology_requests_entry& entry) { if (!entry.id) { return tasks::task_manager::task_state::created; } else if (!entry.done) { return tasks::task_manager::task_state::running; } else if (entry.error == "") { return tasks::task_manager::task_state::done; } else { return tasks::task_manager::task_state::failed; } } static future get_entries(db::system_keyspace& sys_ks, std::chrono::seconds ttl) { return sys_ks.get_node_ops_request_entries(db_clock::now() - ttl); } static tasks::task_stats get_task_stats(const db::system_keyspace::topology_requests_entry& entry, const tasks::virtual_task_hint& hint) { return tasks::task_stats{ .task_id = tasks::task_id{entry.id}, .type = request_type_to_task_type(entry.request_type), .kind = tasks::task_kind::cluster, .scope = "cluster", .state = get_state(entry), .sequence_number = 0, .keyspace = "", .table = "", .entity = hint.node_id ? fmt::to_string(*hint.node_id) : "", .shard = 0, .start_time = entry.start_time, .end_time = entry.end_time, }; } future> node_ops_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) { auto entry_opt = co_await _ss._sys_ks.local().get_topology_request_entry_opt(id.uuid()); if (!entry_opt) { co_return std::nullopt; } auto& entry = *entry_opt; auto stats = get_task_stats(entry, hint); co_return tasks::task_status{ .task_id = stats.task_id, .type = stats.type, .kind = stats.kind, .scope = stats.scope, .state = stats.state, .is_abortable = co_await is_abortable(std::move(hint)), .start_time = stats.start_time, .end_time = stats.end_time, .error = entry.error, .parent_id = tasks::task_id::create_null_id(), .sequence_number = stats.sequence_number, .shard = stats.shard, .keyspace = stats.keyspace, .table = stats.table, .entity = stats.entity, .progress_units = "", .progress = tasks::task_manager::task::progress{}, .children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) }; } tasks::task_manager::task_group node_ops_virtual_task::get_group() const noexcept { return tasks::task_manager::task_group::topology_change_group; } static std::map get_requests(const service::topology& topology) { std::map result; for (auto& request : topology.requests) { auto* rs = topology.find(request.first); if (rs) { result.emplace(tasks::task_id(rs->second.request_id), locator::host_id(request.first.uuid())); } } for (auto& [node, rs] : topology.transition_nodes) { result.emplace(tasks::task_id(rs.request_id), locator::host_id(node.uuid())); } return result; } future> node_ops_virtual_task::contains(tasks::task_id task_id) const { if (!task_id.uuid().is_timestamp()) { // Task id of node ops operation is always a timestamp. co_return std::nullopt; } auto hint = std::make_optional({}); service::topology& topology = _ss._topology_state_machine._topology; auto reqs = get_requests(topology); if (reqs.contains(task_id)) { hint->node_id = reqs.at(task_id); co_return hint; } auto entry = co_await _ss._sys_ks.local().get_topology_request_entry_opt(task_id.uuid()); co_return entry && std::holds_alternative(entry->request_type) ? hint : std::nullopt; } future node_ops_virtual_task::is_abortable(tasks::virtual_task_hint hint) const { // Currently, only node operations are supported by abort_topology_request(). return make_ready_future(tasks::is_abortable(hint.node_id.has_value())); } future> node_ops_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) { auto entry = co_await get_status(id, hint); if (!entry) { co_return std::nullopt; } co_await _ss.wait_for_topology_request_completion(id.uuid(), false); co_return co_await get_status(id, std::move(hint)); } future<> node_ops_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept { co_await _ss.abort_topology_request(id.uuid()); } future> node_ops_virtual_task::get_stats() { db::system_keyspace& sys_ks = _ss._sys_ks.local(); co_return std::ranges::to>(co_await get_entries(sys_ks, get_task_manager().get_user_task_ttl()) | std::views::transform([reqs = get_requests(_ss._topology_state_machine._topology)] (const auto& e) { auto id = tasks::task_id{e.first}; auto& entry = e.second; tasks::virtual_task_hint hint; if (reqs.contains(id)) { hint.node_id = reqs.at(id); } return get_task_stats(entry, hint); })); } streaming_task_impl::streaming_task_impl(tasks::task_manager::module_ptr module, tasks::task_id parent_id, streaming::stream_reason reason, std::optional>& result, std::function()> action) noexcept : tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", "", "", "", parent_id) , _reason(reason) , _result(result) , _action(std::move(action)) {} std::string streaming_task_impl::type() const { return fmt::format("{}: streaming", _reason); } tasks::is_internal streaming_task_impl::is_internal() const noexcept { return tasks::is_internal::no; } future<> streaming_task_impl::run() { // If no operation was previously started - start it now // If previous operation still running - wait for it an return its result // If previous operation completed successfully - return immediately // If previous operation failed - restart it if (!_result || _result->failed()) { if (_result) { service::rtlogger.info("retry streaming after previous attempt failed with {}", _result->get_future().get_exception()); } else { service::rtlogger.info("start streaming"); } _result = _action(); } else { service::rtlogger.debug("already streaming"); } co_await _result.value().get_future(); service::rtlogger.info("streaming completed"); } task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept : tasks::task_manager::module(tm, "node_ops") , _ss(ss) {} std::set task_manager_module::get_nodes() const { return get_task_manager().get_nodes(_ss); } }