/* * Copyright (C) 2024-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "locator/tablets.hh" #include "replica/database.hh" #include "service/migration_manager.hh" #include "service/storage_service.hh" #include "repair/row_level.hh" #include "service/task_manager_module.hh" #include "service/topology_state_machine.hh" #include "tasks/task_handler.hh" #include "tasks/virtual_task_hint.hh" #include namespace service { struct status_helper { tasks::task_status status; std::optional pending_replica; }; tasks::task_manager::task_group tablet_virtual_task::get_group() const noexcept { return tasks::task_manager::task_group::tablets_group; } static std::optional maybe_get_task_type(const locator::tablet_task_info& task_info, tasks::task_id task_id) { return task_info.is_valid() && task_info.tablet_task_id.uuid() == task_id.uuid() ? std::make_optional(task_info.request_type) : std::nullopt; } static sstring get_scope(locator::tablet_task_type task_type) { switch (task_type) { case locator::tablet_task_type::user_repair: case locator::tablet_task_type::split: case locator::tablet_task_type::merge: return "table"; case locator::tablet_task_type::auto_repair: case locator::tablet_task_type::migration: case locator::tablet_task_type::intranode_migration: return "tablet"; case locator::tablet_task_type::none: on_internal_error(tasks::tmlogger, "attempted to get the scope for none task type"); } } static std::optional maybe_make_task_stats(const locator::tablet_task_info& task_info, schema_ptr schema) { if (!task_info.is_valid()) { return std::nullopt; } return tasks::task_stats{ .task_id = tasks::task_id{task_info.tablet_task_id.uuid()}, .type = locator::tablet_task_type_to_string(task_info.request_type), .kind = tasks::task_kind::cluster, .scope = get_scope(task_info.request_type), .state = tasks::task_manager::task_state::running, .keyspace = schema->ks_name(), .table = schema->cf_name(), .start_time = task_info.request_time }; } static bool is_repair_task(const locator::tablet_task_type& task_type) { return task_type == locator::tablet_task_type::user_repair || task_type == locator::tablet_task_type::auto_repair; } static bool is_migration_task(const locator::tablet_task_type& task_type) { return task_type == locator::tablet_task_type::migration || task_type == locator::tablet_task_type::intranode_migration; } static bool is_resize_task(const locator::tablet_task_type& task_type) { return task_type == locator::tablet_task_type::split || task_type == locator::tablet_task_type::merge; } static bool tablet_id_provided(const locator::tablet_task_type& task_type) { return is_migration_task(task_type); } future> tablet_virtual_task::contains(tasks::task_id task_id) const { co_await _ss._migration_manager.local().get_group0_barrier().trigger(); auto tmptr = _ss.get_token_metadata_ptr(); auto tables = get_table_ids(); for (auto table : tables) { auto& tmap = tmptr->tablets().get_tablet_map(table); if (auto task_type = maybe_get_task_type(tmap.resize_task_info(), task_id); task_type.has_value()) { co_return tasks::virtual_task_hint{ .table_id = table, .task_type = task_type.value(), .tablet_id = std::nullopt, }; } std::optional tid = tmap.first_tablet(); for (const locator::tablet_info& info : tmap.tablets()) { auto task_type = maybe_get_task_type(info.repair_task_info, task_id).or_else([&] () { return maybe_get_task_type(info.migration_task_info, task_id); }); if (task_type.has_value()) { co_return tasks::virtual_task_hint{ .table_id = table, .task_type = task_type.value(), .tablet_id = tablet_id_provided(task_type.value()) ? std::make_optional(*tid) : std::nullopt, }; } co_await coroutine::maybe_yield(); tid = tmap.next_tablet(*tid); } } // Check if the task id is present in the repair task table auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(task_id); if (progress && progress->requested > 0) { co_return tasks::virtual_task_hint{ .table_id = progress->table_uuid, .task_type = locator::tablet_task_type::user_repair, .tablet_id = std::nullopt, }; } co_return std::nullopt; } future tablet_virtual_task::is_abortable(tasks::virtual_task_hint hint) const { auto task_type = hint.get_task_type(); return make_ready_future(is_repair_task(task_type)); } future> tablet_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) { auto res = co_await get_status_helper(id, std::move(hint)); co_return res.transform([] (status_helper s) { return s.status; }); } future> tablet_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) { auto table = hint.get_table_id(); auto task_type = hint.get_task_type(); auto tablet_id_opt = tablet_id_provided(task_type) ? std::make_optional(hint.get_tablet_id()) : std::nullopt; const auto& tablets = _ss.get_token_metadata().tablets(); size_t tablet_count = tablets.has_tablet_map(table) ? tablets.get_tablet_map(table).tablet_count() : 0; auto res = co_await get_status_helper(id, std::move(hint)); if (!res) { co_return std::nullopt; } tasks::tmlogger.info("tablet_virtual_task: wait until tablet operation is finished"); co_await utils::get_local_injector().inject("tablet_virtual_task_wait", utils::wait_for_message(60s)); while (true) { co_await _ss._topology_state_machine.event.wait([&] { if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) { return true; } auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table); if (is_resize_task(task_type)) { // Resize task. return tmap.resize_task_info().tablet_task_id.uuid() != id.uuid(); } else if (tablet_id_opt.has_value()) { // Migration task. return tmap.get_tablet_info(tablet_id_opt.value()).migration_task_info.tablet_task_id.uuid() != id.uuid(); } else { // Repair task. return true; } }); if (!is_repair_task(task_type)) { break; } auto tmptr = _ss.get_token_metadata_ptr(); if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) { break; } auto& tmap = tmptr->tablets().get_tablet_map(table); bool repair_still_running = false; co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) { repair_still_running = repair_still_running || (info.repair_task_info.is_valid() && info.repair_task_info.tablet_task_id.uuid() == id.uuid()); return make_ready_future(); }); if (!repair_still_running) { break; } } res->status.state = tasks::task_manager::task_state::done; // Failed repair task is retried. if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) { res->status.end_time = db_clock::now(); co_return res->status; } if (is_migration_task(task_type)) { auto& replicas = _ss.get_token_metadata().tablets().get_tablet_map(table).get_tablet_info(tablet_id_opt.value()).replicas; auto migration_failed = std::all_of(replicas.begin(), replicas.end(), [&] (const auto& replica) { return res->pending_replica.has_value() && replica != res->pending_replica.value(); }); res->status.state = migration_failed ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::done; } else if (is_resize_task(task_type)) { auto new_tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count(); res->status.state = new_tablet_count == tablet_count ? tasks::task_manager::task_state::suspended : tasks::task_manager::task_state::done; res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector{}; } else { res->status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()); } res->status.end_time = db_clock::now(); // FIXME: Get precise end time. co_return res->status; } future<> tablet_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept { auto table = hint.get_table_id(); auto task_type = hint.get_task_type(); if (!is_repair_task(task_type)) { on_internal_error(tasks::tmlogger, format("non-abortable task {} of type {} cannot be aborted", id, task_type)); } co_await _ss.del_repair_tablet_request(table, locator::tablet_task_id{id.uuid()}); } future> tablet_virtual_task::get_stats() { std::vector res; auto tmptr = _ss.get_token_metadata_ptr(); auto tables = get_table_ids(); for (auto table : tables) { auto& tmap = tmptr->tablets().get_tablet_map(table); auto schema = _ss._db.local().get_tables_metadata().get_table(table).schema(); std::unordered_map user_requests; std::unordered_map sched_num_sum; auto resize_stats = maybe_make_task_stats(tmap.resize_task_info(), schema); if (resize_stats) { res.push_back(std::move(resize_stats.value())); } co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) { auto repair_stats = maybe_make_task_stats(info.repair_task_info, schema); if (repair_stats) { if (info.repair_task_info.is_user_repair_request()) { // User requested repair may encompass more that one tablet. auto task_id = tasks::task_id{info.repair_task_info.tablet_task_id.uuid()}; user_requests[task_id] = std::move(repair_stats.value()); sched_num_sum[task_id] += info.repair_task_info.sched_nr; } else { res.push_back(std::move(repair_stats.value())); } } auto migration_stats = maybe_make_task_stats(info.migration_task_info, schema); if (migration_stats) { res.push_back(std::move(migration_stats.value())); } return make_ready_future(); }); for (auto& [id, task_stats] : user_requests) { task_stats.state = sched_num_sum[id] == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running; res.push_back(std::move(task_stats)); } } // FIXME: Show finished tasks. co_return res; } std::vector tablet_virtual_task::get_table_ids() const { return _ss.get_token_metadata().tablets().all_table_groups() | std::views::transform([] (const auto& table_to_tablets) { return table_to_tablets.first; }) | std::ranges::to>(); } static void update_status(const locator::tablet_task_info& task_info, tasks::task_status& status, size_t& sched_nr) { sched_nr += task_info.sched_nr; status.type = locator::tablet_task_type_to_string(task_info.request_type); status.scope = get_scope(task_info.request_type); status.start_time = task_info.request_time; } future> tablet_virtual_task::get_status_helper(tasks::task_id id, tasks::virtual_task_hint hint) { status_helper res; auto table = hint.get_table_id(); auto task_type = hint.get_task_type(); auto table_ptr = _ss._db.local().get_tables_metadata().get_table_if_exists(table); if (!table_ptr) { co_return tasks::task_status { .task_id = id, .kind = tasks::task_kind::cluster, .is_abortable = co_await is_abortable(std::move(hint)), }; } auto schema = table_ptr->schema(); res.status = { .task_id = id, .kind = tasks::task_kind::cluster, .is_abortable = co_await is_abortable(std::move(hint)), .keyspace = schema->ks_name(), .table = schema->cf_name(), }; size_t sched_nr = 0; auto tmptr = _ss.get_token_metadata_ptr(); auto& tmap = tmptr->tablets().get_tablet_map(table); bool repair_task_finished = false; bool repair_task_pending = false; bool no_tablets_processed = true; if (is_repair_task(task_type)) { auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(id); if (progress) { res.status.progress.completed = progress->finished; res.status.progress.total = progress->requested; res.status.progress_units = "tablets"; if (progress->requested > 0 && progress->requested == progress->finished) { repair_task_finished = true; } if (progress->requested > 0 && progress->requested > progress->finished) { repair_task_pending = true; } } co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) { auto& task_info = info.repair_task_info; if (task_info.tablet_task_id.uuid() == id.uuid()) { update_status(task_info, res.status, sched_nr); no_tablets_processed = false; } return make_ready_future(); }); res.status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()); } else if (is_migration_task(task_type)) { // Migration task. auto tablet_id = hint.get_tablet_id(); res.pending_replica = tmap.get_tablet_transition_info(tablet_id)->pending_replica; auto& task_info = tmap.get_tablet_info(tablet_id).migration_task_info; if (task_info.tablet_task_id.uuid() == id.uuid()) { update_status(task_info, res.status, sched_nr); no_tablets_processed = false; } } else { // Resize task. auto& task_info = tmap.resize_task_info(); if (task_info.tablet_task_id.uuid() == id.uuid()) { update_status(task_info, res.status, sched_nr); res.status.state = tasks::task_manager::task_state::running; res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector{}; co_return res; } } if (!no_tablets_processed) { res.status.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running; co_return res; } if (repair_task_pending) { // When repair_task_pending is true, the res.tablets will be empty iff the request is aborted by user. res.status.state = no_tablets_processed ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::running; co_return res; } if (repair_task_finished) { res.status.state = tasks::task_manager::task_state::done; co_return res; } co_return std::nullopt; } task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept : tasks::task_manager::module(tm, "tablets") , _ss(ss) {} std::set task_manager_module::get_nodes() const { return get_task_manager().get_nodes(_ss); } namespace topo { static tasks::task_manager::task_state get_state(const db::system_keyspace::topology_requests_entry& entry) { if (!entry.id) { return tasks::task_manager::task_state::created; } else if (!entry.done) { return tasks::task_manager::task_state::running; } else if (entry.error == "") { return tasks::task_manager::task_state::done; } else { return tasks::task_manager::task_state::failed; } } tasks::task_manager::task_group global_topology_request_virtual_task::get_group() const noexcept { return tasks::task_manager::task_group::global_topology_change_group; } future> global_topology_request_virtual_task::contains(tasks::task_id task_id) const { if (!task_id.uuid().is_timestamp()) { // Task id of node ops operation is always a timestamp. co_return std::nullopt; } auto hint = std::make_optional({}); auto entry = co_await _ss._sys_ks.local().get_topology_request_entry_opt(task_id.uuid()); if (entry.has_value() && std::holds_alternative(entry->request_type) && std::get(entry->request_type) == global_topology_request::keyspace_rf_change) { co_return hint; } co_return std::nullopt; } future global_topology_request_virtual_task::is_abortable(tasks::virtual_task_hint) const { return make_ready_future(tasks::is_abortable::yes); } static tasks::task_stats get_task_stats(const db::system_keyspace::topology_requests_entry& entry) { return tasks::task_stats{ .task_id = tasks::task_id{entry.id}, .type = fmt::to_string(std::get(entry.request_type)), .kind = tasks::task_kind::cluster, .scope = "keyspace", .state = get_state(entry), .sequence_number = 0, .keyspace = entry.new_keyspace_rf_change_ks_name.value_or(""), .table = "", .entity = "", .shard = 0, .start_time = entry.start_time, .end_time = entry.end_time, }; } future> global_topology_request_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) { auto entry = co_await _ss._sys_ks.local().get_topology_request_entry_opt(id.uuid()); if (!entry.has_value()) { co_return std::nullopt; } auto task_stats = get_task_stats(*entry); co_return tasks::task_status{ .task_id = task_stats.task_id, .type = task_stats.type, .kind = task_stats.kind, .scope = task_stats.scope, .state = task_stats.state, .is_abortable = co_await is_abortable(std::move(hint)), .start_time = task_stats.start_time, .end_time = task_stats.end_time, .error = entry->error, .parent_id = tasks::task_id::create_null_id(), .sequence_number = task_stats.sequence_number, .shard = task_stats.shard, .keyspace = task_stats.keyspace, .table = task_stats.table, .entity = task_stats.entity, .progress_units = "", .progress = tasks::task_manager::task::progress{}, .children = utils::chunked_vector{}, }; } future> global_topology_request_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) { auto entry = co_await get_status(id, hint); if (!entry) { co_return std::nullopt; } co_await _ss.wait_for_topology_request_completion(id.uuid(), false); co_return co_await get_status(id, std::move(hint)); } future<> global_topology_request_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint) noexcept { return _ss.abort_paused_rf_change(id.uuid()); } future> global_topology_request_virtual_task::get_stats() { db::system_keyspace& sys_ks = _ss._sys_ks.local(); co_return std::ranges::to>(co_await sys_ks.get_topology_request_entries({global_topology_request::keyspace_rf_change}, db_clock::now() - get_task_manager().get_user_task_ttl()) | std::views::transform([] (const auto& e) { auto& entry = e.second; return get_task_stats(entry); })); } task_manager_module::task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "global_topology_requests") {} } }