diff --git a/api/storage_service.cc b/api/storage_service.cc index e7b868a05f..e14e06996e 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1743,11 +1743,11 @@ rest_get_vnode_tablet_migration(http_context& ctx, sharded& dir, sharded& db, compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding) noexcept - : resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id()) + : resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), parent_id ? 0 : module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", parent_id) , _dir(dir) , _db(db) , _creator(std::move(creator)) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 87ce94ecd4..54e7919363 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -23,6 +23,7 @@ #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" #include "db/schema_tables.hh" +#include "service/task_manager_module.hh" #include "compaction/compaction_manager.hh" #include "compaction/task_manager_module.hh" #include "sstables/sstables.hh" @@ -101,9 +102,9 @@ distributed_loader::lock_table(global_table_ptr& table, sharded -distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding) { +distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding, tasks::task_info parent_info) { auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); - auto task = co_await compaction_module.make_and_start_task({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding); + auto task = co_await compaction_module.make_and_start_task(parent_info, std::move(ks_name), std::move(table_name), parent_info.id, dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding); co_await task->done(); } @@ -279,21 +280,29 @@ distributed_loader::get_sstables_from(sharded& db, sstring ks } class table_populator { +public: + enum class migration_direction { + none, // no migration + forward, // vnodes -> tablets + rollback // tablets -> vnodes (rollback) + }; + +private: sharded& _db; sstring _ks; sstring _cf; global_table_ptr& _global_table; std::vector>> _sstable_directories; sstables::sstable_version_types _version_for_reshaping = sstables::oldest_writable_sstable_format; - bool _migrate_to_tablets = false; + migration_direction _migration_direction; public: - table_populator(global_table_ptr& ptr, sharded& db, sstring ks, sstring cf, bool migrate_to_tablets = false) + table_populator(global_table_ptr& ptr, sharded& db, sstring ks, sstring cf, migration_direction md = migration_direction::none) : _db(db) , _ks(std::move(ks)) , _cf(std::move(cf)) , _global_table(ptr) - , _migrate_to_tablets(migrate_to_tablets) + , _migration_direction(md) { } @@ -396,10 +405,13 @@ sstables::shared_sstable make_sstable(replica::table& table, sstables::sstable_s future<> table_populator::populate_subdir(sharded& directory) { auto state = directory.local().state(); - dblog.debug("Populating {}/{}/{} state={} resharding_mode={}", _ks, _cf, _global_table->get_storage_options(), state, _migrate_to_tablets ? "vnodes-to-tablets" : "normal"); + bool vnodes_resharding = _migration_direction == migration_direction::forward; + dblog.debug("Populating {}/{}/{} state={} resharding_mode={}", _ks, _cf, _global_table->get_storage_options(), state, vnodes_resharding ? "vnodes-to-tablets" : "normal"); + tasks::task_info parent_info; compaction::owned_ranges_ptr owned_ranges_ptr = nullptr; - if (_migrate_to_tablets) { + + if (vnodes_resharding) { // Build owned_ranges from the tablet map. auto table_uuid = _global_table->schema()->id(); auto& tmap = _db.local().get_shared_token_metadata().get()->tablets().get_tablet_map(table_uuid); @@ -411,13 +423,17 @@ future<> table_populator::populate_subdir(sharded& owned_ranges_ptr = compaction::make_owned_ranges_ptr(std::move(ranges)); } + if (_migration_direction != migration_direction::none) { + parent_info = tasks::task_info(service::vnodes_to_tablets::migration_virtual_task::make_task_id(_ks), 0); + } + co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, state] (shard_id shard) mutable { auto gen = smp::submit_to(shard, [this] () { return _global_table->calculate_generation_for_new_table(); }).get(); return make_sstable(*_global_table, state, gen, _version_for_reshaping); - }, owned_ranges_ptr, _migrate_to_tablets); + }, owned_ranges_ptr, vnodes_resharding, parent_info); // The node is offline at this point so we are very lenient with what we consider // offstrategy. @@ -449,7 +465,8 @@ future<> table_populator::populate_subdir(sharded& } future<> distributed_loader::populate_keyspace(sharded& db, - sharded& sys_ks, keyspace& ks, sstring ks_name) + sharded& sys_ks, keyspace& ks, sstring ks_name, + std::optional storage_mode) { dblog.info("Populating Keyspace {}", ks_name); @@ -461,12 +478,19 @@ future<> distributed_loader::populate_keyspace(sharded& db, dblog.info("Keyspace {}: Reading CF {} id={} version={} storage={}", ks_name, cfname, uuid, s->version(), cf.get_storage_options()); - bool migrating_to_tablets = cf.uses_tablets() && !ks.uses_tablets(); - if (migrating_to_tablets) { - dblog.info("Keyspace {}: CF {} is in vnodes-to-tablets migration mode", ks_name, cfname); + using md = table_populator::migration_direction; + auto& tablet_metadata = db.local().get_shared_token_metadata().get()->tablets(); + auto direction = [&]() { + if (ks.uses_tablets()) { return md::none; } + if (cf.uses_tablets()) { return md::forward; } + if (tablet_metadata.has_tablet_map(uuid) && storage_mode == service::intended_storage_mode::vnodes) { return md::rollback; } + return md::none; + }(); + if (direction != md::none) { + dblog.info("Keyspace {}: CF {} is in vnodes-to-tablets migration mode (direction: {})", ks_name, cfname, direction == md::forward ? "forward" : "rollback"); } - auto metadata = table_populator(gtable, db, ks_name, cfname, migrating_to_tablets); + auto metadata = table_populator(gtable, db, ks_name, cfname, direction); std::exception_ptr ex; try { @@ -583,7 +607,7 @@ future<> distributed_loader::init_non_system_keyspaces(sharded reshape(sharded& dir, sharded& db, compaction::reshape_mode mode, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, std::function filter); static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, - compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false); + compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false, tasks::task_info parent_info = {}); static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(global_table_ptr&, sharded& dir); static future make_sstables_available(sstables::sstable_directory& dir, sharded& db, sharded& vb, sharded& vbw, db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf); - static future<> populate_keyspace(sharded& db, sharded& sys_ks, keyspace& ks, sstring ks_name); + static future<> populate_keyspace(sharded& db, sharded& sys_ks, keyspace& ks, sstring ks_name, + std::optional storage_mode = std::nullopt); static future>>> get_sstables_from(sharded& db, sstring ks, sstring cf, sstables::sstable_open_config cfg, noncopyable_function(global_table_ptr&, sharded&)> start_dir); diff --git a/service/storage_service.cc b/service/storage_service.cc index e2a3d37c50..5a1a65c80e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -225,6 +225,7 @@ storage_service::storage_service(abort_source& abort_source, , _node_ops_module(make_shared(tm, *this)) , _tablets_module(make_shared(tm, *this)) , _global_topology_requests_module(make_shared(tm)) + , _vnodes_to_tablets_migration_module(make_shared(tm, *this)) , _address_map(address_map) , _shared_token_metadata(stm) , _erm_factory(erm_factory) @@ -250,10 +251,12 @@ storage_service::storage_service(abort_source& abort_source, tm.register_module(_node_ops_module->get_name(), _node_ops_module); tm.register_module(_tablets_module->get_name(), _tablets_module); tm.register_module(_global_topology_requests_module->get_name(), _global_topology_requests_module); + tm.register_module(_vnodes_to_tablets_migration_module->get_name(), _vnodes_to_tablets_migration_module); if (this_shard_id() == 0) { _node_ops_module->make_virtual_task(*this); _tablets_module->make_virtual_task(*this); _global_topology_requests_module->make_virtual_task(*this); + _vnodes_to_tablets_migration_module->make_virtual_task(*this); } register_metrics(); @@ -2342,6 +2345,7 @@ future<> storage_service::stop() { co_await _tablets_module->stop(); co_await _node_ops_module->stop(); co_await _global_topology_requests_module->stop(); + co_await _vnodes_to_tablets_migration_module->stop(); co_await _async_gate.close(); _tablet_split_monitor_event.signal(); co_await std::move(_tablet_split_monitor); @@ -4109,22 +4113,12 @@ future<> storage_service::set_node_intended_storage_mode(intended_storage_mode m slogger.info("Successfully set intended storage mode for node {} to {}", raft_server.id(), mode); } -future storage_service::get_tablets_migration_status(const sstring& ks_name) { - if (this_shard_id() != 0) { - co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) { - return ss.get_tablets_migration_status(ks_name); - }); - } - +storage_service::migration_status storage_service::get_tablets_migration_status(const sstring& ks_name) { auto& db = _db.local(); auto& ks = db.find_keyspace(ks_name); - keyspace_migration_status result; - result.keyspace = ks_name; - if (ks.uses_tablets()) { - result.status = "tablets"; - co_return result; + return migration_status::tablets; } const auto& tm = get_token_metadata(); @@ -4138,14 +4132,38 @@ future storage_service::get_tablets_ }); if (!has_tablet_maps) { - result.status = "vnodes"; + return migration_status::vnodes; + } + + return migration_status::migrating_to_tablets; +} + +future storage_service::get_tablets_migration_status_with_node_details(const sstring& ks_name) { + if (this_shard_id() != 0) { + co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) { + return ss.get_tablets_migration_status_with_node_details(ks_name); + }); + } + + keyspace_migration_status result; + result.keyspace = ks_name; + result.status = get_tablets_migration_status(ks_name); + + if (result.status != migration_status::migrating_to_tablets) { co_return result; } - result.status = "migrating_to_tablets"; + // system.tablet_sizes is a group0_virtual_table, so it requires group0 to + // be initialized. If this function is called via the task manager API, + // group0 may not be initialized yet. + if (!_group0 || !_group0->joined_group0()) { + throw std::runtime_error(::format("Cannot fetch node statuses for migrating keyspace '{}': group0 is not yet initialized on this node", ks_name)); + } // Pick one table and query system.tablet_sizes to find which nodes // report tablet sizes (i.e. have loaded tablet-based ERMs). + auto& ks = _db.local().find_keyspace(ks_name); + auto tables = ks.metadata()->tables(); auto sample_table_id = tables.front()->id(); // FIXME: system.tablet_sizes might return stale data (load stats in the topology coordinator are cached). diff --git a/service/storage_service.hh b/service/storage_service.hh index 6d07225dcd..8f70697d4e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -228,6 +228,7 @@ private: shared_ptr _node_ops_module; shared_ptr _tablets_module; shared_ptr _global_topology_requests_module; + shared_ptr _vnodes_to_tablets_migration_module; gms::gossip_address_map& _address_map; future do_tablet_operation(locator::global_tablet_id tablet, sstring op_name, @@ -297,13 +298,20 @@ public: sstring intended_mode; // "vnodes" or "tablets" }; + enum class migration_status { + vnodes, + migrating_to_tablets, + tablets, + }; + struct keyspace_migration_status { sstring keyspace; - sstring status; // "vnodes", "migrating_to_tablets", or "tablets" + migration_status status; std::vector nodes; }; - future get_tablets_migration_status(const sstring& ks_name); + migration_status get_tablets_migration_status(const sstring& ks_name); + future get_tablets_migration_status_with_node_details(const sstring& ks_name); future<> set_node_intended_storage_mode(intended_storage_mode mode); future<> finalize_tablets_migration(const sstring& ks_name); @@ -1054,6 +1062,7 @@ public: friend class tasks::task_manager; friend class tablet_virtual_task; friend class topo::global_topology_request_virtual_task; + friend class vnodes_to_tablets::migration_virtual_task; }; } @@ -1080,3 +1089,18 @@ struct fmt::formatter : fmt::formatter +struct fmt::formatter : fmt::formatter { + template + auto format(service::storage_service::migration_status status, FormatContext& ctx) const { + std::string_view name; + using enum service::storage_service::migration_status; + switch (status) { + case vnodes: name = "vnodes"; break; + case migrating_to_tablets: name = "migrating_to_tablets"; break; + case tablets: name = "tablets"; break; + } + return fmt::format_to(ctx.out(), "{}", name); + } +}; \ No newline at end of file diff --git a/service/task_manager_module.cc b/service/task_manager_module.cc index 6428c44622..bcce8059c0 100644 --- a/service/task_manager_module.cc +++ b/service/task_manager_module.cc @@ -15,6 +15,7 @@ #include "service/topology_state_machine.hh" #include "tasks/task_handler.hh" #include "tasks/virtual_task_hint.hh" +#include "utils/UUID_gen.hh" #include namespace service { @@ -469,5 +470,188 @@ task_manager_module::task_manager_module(tasks::task_manager& tm) noexcept } +namespace vnodes_to_tablets { + +tasks::task_id migration_virtual_task::make_task_id(const sstring& keyspace) { + // Prefix the keyspace name with a unique identifier for this task to avoid + // collisions with other named UUIDs that may be added in the future. + auto uuid = utils::UUID_gen::get_name_UUID("vnodes_to_tablets_migration:" + keyspace); + return tasks::task_id{uuid}; +} + +std::optional migration_virtual_task::find_keyspace_for_task_id(tasks::task_id id) const { + auto& db = _ss._db.local(); + // Note: We could use a cache here, but it's probably an overkill. + // Scylla supports up to 1000 keyspaces, which is a relatively small number + // for UUID computation. + for (const auto& ks_name : db.get_all_keyspaces()) { + if (make_task_id(ks_name) == id) { + return ks_name; + } + } + return std::nullopt; +} + +tasks::task_manager::task_group migration_virtual_task::get_group() const noexcept { + return tasks::task_manager::task_group::vnodes_to_tablets_migration_group; +} + +future> migration_virtual_task::contains(tasks::task_id task_id) const { + if (!task_id.uuid().is_name_based()) { + // Task id of migration task is always a named UUID. + co_return std::nullopt; + } + auto ks_name = find_keyspace_for_task_id(task_id); + if (!ks_name) { + co_return std::nullopt; + } + auto status = _ss.get_tablets_migration_status(*ks_name); + if (status != storage_service::migration_status::migrating_to_tablets) { + co_return std::nullopt; + } + co_return tasks::virtual_task_hint{.keyspace_name = *ks_name}; +} + +tasks::task_stats migration_virtual_task::make_task_stats(tasks::task_id id, const sstring& keyspace) { + return tasks::task_stats{ + .task_id = id, + .type = "vnodes_to_tablets_migration", + .kind = tasks::task_kind::cluster, + .scope = "keyspace", + .state = tasks::task_manager::task_state::running, + .sequence_number = 0, + .keyspace = keyspace, + .table = "", + .entity = "", + .shard = 0, + .start_time = {}, + .end_time = {}, + }; +} + +tasks::task_status migration_virtual_task::make_task_status(tasks::task_id id, const sstring& keyspace, + tasks::task_manager::task_state state, + tasks::task_manager::task::progress progress) { + auto stats = make_task_stats(id, keyspace); + return tasks::task_status{ + .task_id = stats.task_id, + .type = stats.type, + .kind = stats.kind, + .scope = stats.scope, + .state = state, + .is_abortable = tasks::is_abortable::no, + .start_time = stats.start_time, + .end_time = stats.end_time, + .parent_id = tasks::task_id::create_null_id(), + .sequence_number = stats.sequence_number, + .shard = stats.shard, + .keyspace = stats.keyspace, + .table = stats.table, + .entity = stats.entity, + .progress_units = "nodes", + .progress = progress, + }; +} + +future> migration_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) { + auto& ks_name = hint.keyspace_name; + if (!ks_name) { + co_return std::nullopt; + } + storage_service::keyspace_migration_status status; + try { + status = co_await _ss.get_tablets_migration_status_with_node_details(*ks_name); + } catch (const replica::no_such_keyspace&) { + co_return std::nullopt; + } + if (status.status != storage_service::migration_status::migrating_to_tablets) { + co_return std::nullopt; + } + + // The progress tracks the number of nodes currently using tablets. + // During forward migration it increases; during rollback it decreases. + double nodes_upgraded = 0; + double total_nodes = status.nodes.size(); + for (const auto& node : status.nodes) { + if (node.current_mode == "tablets") { + nodes_upgraded++; + } + } + + auto task_state = tasks::task_manager::task_state::running; + auto task_progress = tasks::task_manager::task::progress{nodes_upgraded, total_nodes}; + + // Note: Children are left empty. Although the resharding tasks are children + // of the migration task, using get_children() here would be pointless + // because resharding runs during startup before start_listen() is called, + // so the RPC fan-out to collect children from all nodes (including self) + // would fail. + co_return make_task_status(id, *ks_name, task_state, task_progress); +} + +future> migration_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) { + auto& ks_name = hint.keyspace_name; + if (!ks_name) { + co_return std::nullopt; + } + + storage_service::migration_status status; + while (true) { + try { + status = _ss.get_tablets_migration_status(*ks_name); + } catch (const replica::no_such_keyspace&) { + co_return std::nullopt; + } + if (status != storage_service::migration_status::migrating_to_tablets) { + break; + } + co_await _ss._topology_state_machine.event.wait(); + } + + bool migration_succeeded = status == storage_service::migration_status::tablets; + auto state = migration_succeeded ? tasks::task_manager::task_state::done : tasks::task_manager::task_state::suspended; + double total_nodes = _ss._topology_state_machine._topology.normal_nodes.size(); + auto task_progress = tasks::task_manager::task::progress{migration_succeeded ? total_nodes : 0, total_nodes}; + + auto task_status = make_task_status(id, *ks_name, state, task_progress); + task_status.end_time = db_clock::now(); + + co_return task_status; +} + +future<> migration_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept { + // Vnodes-to-tablets migration cannot be aborted via the task manager. + // It requires a manual rollback procedure. + return make_ready_future<>(); +} + +future> migration_virtual_task::get_stats() { + std::vector result; + auto& db = _ss._db.local(); + for (const auto& ks_name : db.get_all_keyspaces()) { + storage_service::migration_status status; + try { + status = _ss.get_tablets_migration_status(ks_name); + } catch (const replica::no_such_keyspace&) { + continue; + } + if (status != storage_service::migration_status::migrating_to_tablets) { + continue; + } + result.push_back(make_task_stats(make_task_id(ks_name), ks_name)); + } + co_return result; +} + +task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept + : tasks::task_manager::module(tm, "vnodes_to_tablets_migration") + , _ss(ss) +{} + +std::set task_manager_module::get_nodes() const { + return get_task_manager().get_nodes(_ss); +} + +} } diff --git a/service/task_manager_module.hh b/service/task_manager_module.hh index 20443a7d87..7eb9b57a8c 100644 --- a/service/task_manager_module.hh +++ b/service/task_manager_module.hh @@ -83,4 +83,42 @@ public: } +namespace vnodes_to_tablets { + +class migration_virtual_task : public tasks::task_manager::virtual_task::impl { +private: + service::storage_service& _ss; +public: + migration_virtual_task(tasks::task_manager::module_ptr module, + service::storage_service& ss) + : tasks::task_manager::virtual_task::impl(std::move(module)) + , _ss(ss) + {} + virtual tasks::task_manager::task_group get_group() const noexcept override; + virtual future> contains(tasks::task_id task_id) const override; + + virtual future> get_status(tasks::task_id id, tasks::virtual_task_hint hint) override; + virtual future> wait(tasks::task_id id, tasks::virtual_task_hint hint) override; + virtual future<> abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept override; + virtual future> get_stats() override; + static tasks::task_id make_task_id(const sstring& keyspace); +private: + std::optional find_keyspace_for_task_id(tasks::task_id id) const; + static tasks::task_stats make_task_stats(tasks::task_id id, const sstring& keyspace); + static tasks::task_status make_task_status(tasks::task_id id, const sstring& keyspace, + tasks::task_manager::task_state state, + tasks::task_manager::task::progress progress); +}; + +class task_manager_module : public tasks::task_manager::module { +private: + service::storage_service& _ss; +public: + task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept; + + std::set get_nodes() const override; +}; + +} + } diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index 9c1591202f..bca6854e7a 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -114,6 +114,7 @@ public: topology_change_group, tablets_group, global_topology_change_group, + vnodes_to_tablets_migration_group, }; class task : public enable_lw_shared_from_this { diff --git a/tasks/virtual_task_hint.hh b/tasks/virtual_task_hint.hh index ebe6427319..2a23b13fa2 100644 --- a/tasks/virtual_task_hint.hh +++ b/tasks/virtual_task_hint.hh @@ -18,6 +18,7 @@ struct virtual_task_hint { std::optional task_type; std::optional tablet_id; std::optional node_id; + std::optional keyspace_name; locator::tablet_task_type get_task_type() const; locator::tablet_id get_tablet_id() const; diff --git a/test/boost/UUID_test.cc b/test/boost/UUID_test.cc index 28ff626eb5..cf91aa23ca 100644 --- a/test/boost/UUID_test.cc +++ b/test/boost/UUID_test.cc @@ -18,6 +18,17 @@ BOOST_AUTO_TEST_CASE(test_generation_of_name_based_UUID) { BOOST_REQUIRE_EQUAL(fmt::to_string(uuid), "0290003c-977e-397c-ac3e-fdfdc01d626b"); } +BOOST_AUTO_TEST_CASE(test_is_name_based) { + // Verify that a name-based UUID is identified as such + auto uuid = utils::UUID_gen::get_name_UUID("systembatchlog"); + BOOST_CHECK(uuid.is_name_based()); + + // Verify that other UUID types are not name-based + BOOST_CHECK(!utils::null_uuid().is_name_based()); + BOOST_CHECK(!utils::make_random_uuid().is_name_based()); + BOOST_CHECK(!utils::UUID_gen::get_time_UUID().is_name_based()); +} + using utils::UUID; BOOST_AUTO_TEST_CASE(test_UUID_comparison) { diff --git a/test/cluster/test_vnodes_to_tablets_migration.py b/test/cluster/test_vnodes_to_tablets_migration.py index 93dbe5e09e..4ff464283b 100644 --- a/test/cluster/test_vnodes_to_tablets_migration.py +++ b/test/cluster/test_vnodes_to_tablets_migration.py @@ -17,6 +17,7 @@ from test.pylib.rest_client import HTTPError, read_barrier from test.pylib.internal_types import ServerInfo from test.pylib.manager_client import ManagerClient from test.cluster.util import new_test_keyspace, reconnect_driver +from test.cluster.tasks.task_manager_client import TaskManagerClient logger = logging.getLogger(__name__) @@ -110,11 +111,31 @@ async def verify_migration_status(manager: ManagerClient, server: ServerInfo, expected_node_statuses: dict[str, tuple[str, str]], retries: int = 0, retry_interval: float = 0): async def _check(): + # Verify migration status via the migration status API status = await manager.api.get_vnode_tablet_migration_status(server.ip_addr, ks) actual_node_statuses = {n['host_id']: (n['current_mode'], n['intended_mode']) for n in status['nodes']} assert status['status'] == expected_status, f"Expected migration status '{expected_status}', got '{status['status']}'" assert actual_node_statuses == expected_node_statuses, f"Expected node statuses {expected_node_statuses}, got {actual_node_statuses}" + # Verify migration status via the tasks API + tm = TaskManagerClient(manager.api) + tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration") + ks_tasks = [t for t in tasks if t.keyspace == ks] + + if expected_status == "migrating_to_tablets": + assert len(ks_tasks) == 1, f"Expected 1 virtual task for keyspace '{ks}', got {len(ks_tasks)}" + task = ks_tasks[0] + assert task.state == "running", f"Expected task state 'running', got '{task.state}'" + assert task.type == "vnodes_to_tablets_migration", f"Expected task type 'vnodes_to_tablets_migration', got '{task.type}'" + + task_status = await tm.get_task_status(server.ip_addr, task.task_id) + expected_total = len(expected_node_statuses) + expected_completed = sum(1 for cm, _ in expected_node_statuses.values() if cm == 'tablets') + assert task_status.progress_total == expected_total, f"Expected progress_total {expected_total}, got {task_status.progress_total}" + assert task_status.progress_completed == expected_completed, f"Expected progress_completed {expected_completed}, got {task_status.progress_completed}" + else: + assert len(ks_tasks) == 0, f"Expected no virtual tasks for keyspace '{ks}' when status is '{expected_status}', got {len(ks_tasks)}" + for attempt in range(retries + 1): try: await _check() @@ -632,6 +653,96 @@ async def test_migration_finalize_before_upgrade(manager: ManagerClient): await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks) +@pytest.mark.asyncio +async def test_migration_task_not_abortable(manager: ManagerClient): + """Verify that aborting a vnodes-to-tablets migration task via the task manager fails.""" + server, cql = await setup_single_node(manager) + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.t (pk int PRIMARY KEY)") + await manager.api.create_vnode_tablet_migration(server.ip_addr, ks) + + tm = TaskManagerClient(manager.api) + tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration") + ks_tasks = [t for t in tasks if t.keyspace == ks] + assert len(ks_tasks) == 1, f"Expected 1 migration task for keyspace '{ks}', got {len(ks_tasks)}" + + task = ks_tasks[0] + status = await tm.get_task_status(server.ip_addr, task.task_id) + assert status.is_abortable is False, f"Expected task to be non-abortable, got is_abortable={status.is_abortable}" + + with pytest.raises(HTTPError, match="cannot be aborted"): + await tm.abort_task(server.ip_addr, task.task_id) + + # Rollback: schema changes are not yet supported for migrating keyspaces. + # TODO: Remove this once support is added. + await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks) + + +@pytest.mark.asyncio +async def test_migration_wait_task(manager: ManagerClient): + """Verify that the task manager "wait" API works for vnodes-to-tablets migration tasks. + + Exercises two scenarios: + 1. Wait on a migration task that is rolled back — expect "suspended" state. + 2. Wait on a migration task that completes successfully — expect "done" state. + """ + server, cql = await setup_single_node(manager) + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.t (pk int PRIMARY KEY)") + + tm = TaskManagerClient(manager.api) + + # Scenario 1: wait + rollback + + logger.info("Starting migration") + await manager.api.create_vnode_tablet_migration(server.ip_addr, ks) + + tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration") + assert len(tasks) == 1 + assert tasks[0].keyspace == ks + task_id = tasks[0].task_id + + logger.info("Starting wait on the migration task") + wait_task = asyncio.create_task(tm.wait_for_task(server.ip_addr, task_id)) + + logger.info("Rolling back migration") + await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks) + + logger.info("Expecting wait to finish with 'suspended' state") + wait_status = await wait_task + assert wait_status.state == "suspended", f"Expected 'suspended' after rollback, got '{wait_status.state}'" + assert wait_status.progress_completed == 0, f"Expected 0 upgraded nods for rolled back migration, got {wait_status.progress_completed}" + + # Scenario 2: wait + full migration + + logger.info("Starting migration again") + await manager.api.create_vnode_tablet_migration(server.ip_addr, ks) + + tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration") + assert len(tasks) == 1 + assert tasks[0].keyspace == ks + task_id = tasks[0].task_id + + logger.info("Marking the node for tablets migration and restarting") + await manager.api.upgrade_node_to_tablets(server.ip_addr) + await manager.server_restart(server.server_id) + await reconnect_driver(manager) + cql, _ = await manager.get_ready_cql([server]) + + logger.info("Starting wait on the migration task") + wait_task = asyncio.create_task(tm.wait_for_task(server.ip_addr, task_id)) + + logger.info("Finalizing migration") + await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks) + + logger.info("Expecting wait to finish with 'done' state") + wait_status = await wait_task + assert wait_status.state == "done", f"Expected 'done' after finalization, got '{wait_status.state}'" + assert wait_status.progress_completed == 1, f"Expected 1 upgraded node for completed migration, got {wait_status.progress_completed}" + + @pytest.mark.asyncio async def test_migration_multiple_keyspaces(manager: ManagerClient): """Verify that two keyspaces can be migrated from vnodes to tablets simultaneously.""" diff --git a/utils/UUID.hh b/utils/UUID.hh index 8c5505a431..a34259caa6 100644 --- a/utils/UUID.hh +++ b/utils/UUID.hh @@ -52,6 +52,10 @@ public: return version() == 1; } + bool is_name_based() const noexcept { + return version() == 3; + } + int64_t timestamp() const noexcept { //if (version() != 1) { // throw new UnsupportedOperationException("Not a time-based UUID");