From a00056381f9c8c1162eeeef30dc9ec98b384b9ac Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Thu, 26 Mar 2026 18:59:15 +0200 Subject: [PATCH 1/9] utils: Add UUID::is_name_based() The UUID class already provides `is_timestamp()` for identifying time-based (version 1) UUIDs. Add the analogous `is_name_based()` predicate for version 3 (name-based) UUIDs, along with a test. Signed-off-by: Nikos Dragazis --- test/boost/UUID_test.cc | 11 +++++++++++ utils/UUID.hh | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/test/boost/UUID_test.cc b/test/boost/UUID_test.cc index 28ff626eb5..cf91aa23ca 100644 --- a/test/boost/UUID_test.cc +++ b/test/boost/UUID_test.cc @@ -18,6 +18,17 @@ BOOST_AUTO_TEST_CASE(test_generation_of_name_based_UUID) { BOOST_REQUIRE_EQUAL(fmt::to_string(uuid), "0290003c-977e-397c-ac3e-fdfdc01d626b"); } +BOOST_AUTO_TEST_CASE(test_is_name_based) { + // Verify that a name-based UUID is identified as such + auto uuid = utils::UUID_gen::get_name_UUID("systembatchlog"); + BOOST_CHECK(uuid.is_name_based()); + + // Verify that other UUID types are not name-based + BOOST_CHECK(!utils::null_uuid().is_name_based()); + BOOST_CHECK(!utils::make_random_uuid().is_name_based()); + BOOST_CHECK(!utils::UUID_gen::get_time_UUID().is_name_based()); +} + using utils::UUID; BOOST_AUTO_TEST_CASE(test_UUID_comparison) { diff --git a/utils/UUID.hh b/utils/UUID.hh index 8c5505a431..a34259caa6 100644 --- a/utils/UUID.hh +++ b/utils/UUID.hh @@ -52,6 +52,10 @@ public: return version() == 1; } + bool is_name_based() const noexcept { + return version() == 3; + } + int64_t timestamp() const noexcept { //if (version() != 1) { // throw new UnsupportedOperationException("Not a time-based UUID"); From 3096ba05776ed6c53c0838fbf79533d23a2ac231 Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Fri, 17 Apr 2026 18:25:12 +0300 Subject: [PATCH 2/9] storage_service: Replace migration status string with enum Using a string was sufficient while this status was only exposed through the REST API, but the next patches will also consume it internally. Use an enum for the internal representation and convert it back to the existing string values in the REST API. Signed-off-by: Nikos Dragazis --- api/storage_service.cc | 2 +- service/storage_service.cc | 6 +++--- service/storage_service.hh | 23 ++++++++++++++++++++++- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index e7b868a05f..279a9ca867 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1747,7 +1747,7 @@ rest_get_vnode_tablet_migration(http_context& ctx, sharded storage_service::get_tablets_ result.keyspace = ks_name; if (ks.uses_tablets()) { - result.status = "tablets"; + result.status = migration_status::tablets; co_return result; } @@ -4138,11 +4138,11 @@ future storage_service::get_tablets_ }); if (!has_tablet_maps) { - result.status = "vnodes"; + result.status = migration_status::vnodes; co_return result; } - result.status = "migrating_to_tablets"; + result.status = migration_status::migrating_to_tablets; // Pick one table and query system.tablet_sizes to find which nodes // report tablet sizes (i.e. have loaded tablet-based ERMs). diff --git a/service/storage_service.hh b/service/storage_service.hh index 6d07225dcd..aaaff56f48 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -297,9 +297,15 @@ public: sstring intended_mode; // "vnodes" or "tablets" }; + enum class migration_status { + vnodes, + migrating_to_tablets, + tablets, + }; + struct keyspace_migration_status { sstring keyspace; - sstring status; // "vnodes", "migrating_to_tablets", or "tablets" + migration_status status; std::vector nodes; }; @@ -1080,3 +1086,18 @@ struct fmt::formatter : fmt::formatter +struct fmt::formatter : fmt::formatter { + template + auto format(service::storage_service::migration_status status, FormatContext& ctx) const { + std::string_view name; + using enum service::storage_service::migration_status; + switch (status) { + case vnodes: name = "vnodes"; break; + case migrating_to_tablets: name = "migrating_to_tablets"; break; + case tablets: name = "tablets"; break; + } + return fmt::format_to(ctx.out(), "{}", name); + } +}; \ No newline at end of file From 46e3902daa3189c48111f593a821db6847814991 Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Fri, 17 Apr 2026 18:19:43 +0300 Subject: [PATCH 3/9] storage_service: Add keyspace-level migration status function `storage_service::get_tablets_migration_status()` returns the keyspace-level migration status, indicating whether migration has not started, is in progress, or has completed, and for migrating keyspaces also returns per-node migration statuses. Rename it to `get_tablets_migration_status_with_node_details()` and introduce a new `get_tablets_migration_status()` that returns only the keyspace-level status. This prepares the function for reuse in the next patches, which will add a virtual task for vnodes-to-tablets migrations. Several task-manager paths will only need the keyspace-level migration state, not per-node information. Signed-off-by: Nikos Dragazis --- api/storage_service.cc | 2 +- service/storage_service.cc | 37 ++++++++++++++++++++++--------------- service/storage_service.hh | 3 ++- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 279a9ca867..e14e06996e 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1743,7 +1743,7 @@ rest_get_vnode_tablet_migration(http_context& ctx, sharded storage_service::set_node_intended_storage_mode(intended_storage_mode m slogger.info("Successfully set intended storage mode for node {} to {}", raft_server.id(), mode); } -future storage_service::get_tablets_migration_status(const sstring& ks_name) { - if (this_shard_id() != 0) { - co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) { - return ss.get_tablets_migration_status(ks_name); - }); - } - +storage_service::migration_status storage_service::get_tablets_migration_status(const sstring& ks_name) { auto& db = _db.local(); auto& ks = db.find_keyspace(ks_name); - keyspace_migration_status result; - result.keyspace = ks_name; - if (ks.uses_tablets()) { - result.status = migration_status::tablets; - co_return result; + return migration_status::tablets; } const auto& tm = get_token_metadata(); @@ -4138,14 +4128,31 @@ future storage_service::get_tablets_ }); if (!has_tablet_maps) { - result.status = migration_status::vnodes; + return migration_status::vnodes; + } + + return migration_status::migrating_to_tablets; +} + +future storage_service::get_tablets_migration_status_with_node_details(const sstring& ks_name) { + if (this_shard_id() != 0) { + co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) { + return ss.get_tablets_migration_status_with_node_details(ks_name); + }); + } + + keyspace_migration_status result; + result.keyspace = ks_name; + result.status = get_tablets_migration_status(ks_name); + + if (result.status != migration_status::migrating_to_tablets) { co_return result; } - result.status = migration_status::migrating_to_tablets; - // Pick one table and query system.tablet_sizes to find which nodes // report tablet sizes (i.e. have loaded tablet-based ERMs). + auto& ks = _db.local().find_keyspace(ks_name); + auto tables = ks.metadata()->tables(); auto sample_table_id = tables.front()->id(); // FIXME: system.tablet_sizes might return stale data (load stats in the topology coordinator are cached). diff --git a/service/storage_service.hh b/service/storage_service.hh index aaaff56f48..8a8f57d972 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -309,7 +309,8 @@ public: std::vector nodes; }; - future get_tablets_migration_status(const sstring& ks_name); + migration_status get_tablets_migration_status(const sstring& ks_name); + future get_tablets_migration_status_with_node_details(const sstring& ks_name); future<> set_node_intended_storage_mode(intended_storage_mode mode); future<> finalize_tablets_migration(const sstring& ks_name); From ca830c7bce11af3ae4fe32876c246e3d323aff3a Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Thu, 2 Apr 2026 10:03:58 +0300 Subject: [PATCH 4/9] compaction: Add parent_id to table_resharding_compaction_task_impl Required to link it with the migration task in the next patches. Signed-off-by: Nikos Dragazis --- compaction/task_manager_module.hh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index d23ff97030..9014866ee9 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -698,12 +698,13 @@ public: table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, std::string table, + tasks::task_id parent_id, sharded& dir, sharded& db, compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding) noexcept - : resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id()) + : resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), parent_id ? 0 : module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", parent_id) , _dir(dir) , _db(db) , _creator(std::move(creator)) From d1ca01b25daeb26f8dd5269e84808f38f2247a8f Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Thu, 2 Apr 2026 00:39:44 +0300 Subject: [PATCH 5/9] storage_service: Guard migration status against uninitialized group0 `storage_service::get_tablets_migration_status()` reads a group0 virtual table, so it requires group0 to be initialized. When invoked via the migration REST API, this condition is satisfied since the API is only available after joining group0. However, once this function is integrated into the task API later in this series, the assumption will no longer hold, as the task API is exposed earlier in the startup process. Add a guard to detect this condition and return a clear error message. Signed-off-by: Nikos Dragazis --- service/storage_service.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 164f41ea3a..48900fbb00 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4149,6 +4149,13 @@ future storage_service::get_tablets_ co_return result; } + // system.tablet_sizes is a group0_virtual_table, so it requires group0 to + // be initialized. If this function is called via the task manager API, + // group0 may not be initialized yet. + if (!_group0 || !_group0->joined_group0()) { + throw std::runtime_error(::format("Cannot fetch node statuses for migrating keyspace '{}': group0 is not yet initialized on this node", ks_name)); + } + // Pick one table and query system.tablet_sizes to find which nodes // report tablet sizes (i.e. have loaded tablet-based ERMs). auto& ks = _db.local().find_keyspace(ks_name); From 696f9f8954a3c95a4fd995a827c61294bb8ab964 Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Thu, 26 Mar 2026 13:01:37 +0200 Subject: [PATCH 6/9] service: Add virtual task for vnodes-to-tablets migrations Add a virtual task that exposes in-progress vnodes-to-tablets migrations through the task manager API. The task is synthesized from the current migration state, so completed migrations are not shown. Progress is reported as the number of nodes that currently use tablets: it increases on the forward path and decreases on rollback. For simplicity, per-node storage modes are not exposed in the task status; callers that need them should use the migration status REST endpoint. Unlike regular tasks that use time-based UUIDs, this task uses deterministic named UUIDs derived from the keyspace names. This keeps the implementation simple (no need to persist them) and gives each keyspace a stable task ID. The downside is that the start time of each task is unknown and repeated migrations of the same keyspace (migration -> rollback -> new migration) cannot be distinguished. Introduce a new task manager module to keep them separate from other tasks. Add support for `wait()`. While its practical value is debatable (migration is a manual procedure, rolling restart will interrupt it), it keeps the task consistent with the task manager interface. Signed-off-by: Nikos Dragazis --- service/storage_service.cc | 4 + service/storage_service.hh | 2 + service/task_manager_module.cc | 179 +++++++++++++++++++++++++++++++++ service/task_manager_module.hh | 38 +++++++ tasks/task_manager.hh | 1 + tasks/virtual_task_hint.hh | 1 + 6 files changed, 225 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 48900fbb00..5a1a65c80e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -225,6 +225,7 @@ storage_service::storage_service(abort_source& abort_source, , _node_ops_module(make_shared(tm, *this)) , _tablets_module(make_shared(tm, *this)) , _global_topology_requests_module(make_shared(tm)) + , _vnodes_to_tablets_migration_module(make_shared(tm, *this)) , _address_map(address_map) , _shared_token_metadata(stm) , _erm_factory(erm_factory) @@ -250,10 +251,12 @@ storage_service::storage_service(abort_source& abort_source, tm.register_module(_node_ops_module->get_name(), _node_ops_module); tm.register_module(_tablets_module->get_name(), _tablets_module); tm.register_module(_global_topology_requests_module->get_name(), _global_topology_requests_module); + tm.register_module(_vnodes_to_tablets_migration_module->get_name(), _vnodes_to_tablets_migration_module); if (this_shard_id() == 0) { _node_ops_module->make_virtual_task(*this); _tablets_module->make_virtual_task(*this); _global_topology_requests_module->make_virtual_task(*this); + _vnodes_to_tablets_migration_module->make_virtual_task(*this); } register_metrics(); @@ -2342,6 +2345,7 @@ future<> storage_service::stop() { co_await _tablets_module->stop(); co_await _node_ops_module->stop(); co_await _global_topology_requests_module->stop(); + co_await _vnodes_to_tablets_migration_module->stop(); co_await _async_gate.close(); _tablet_split_monitor_event.signal(); co_await std::move(_tablet_split_monitor); diff --git a/service/storage_service.hh b/service/storage_service.hh index 8a8f57d972..8f70697d4e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -228,6 +228,7 @@ private: shared_ptr _node_ops_module; shared_ptr _tablets_module; shared_ptr _global_topology_requests_module; + shared_ptr _vnodes_to_tablets_migration_module; gms::gossip_address_map& _address_map; future do_tablet_operation(locator::global_tablet_id tablet, sstring op_name, @@ -1061,6 +1062,7 @@ public: friend class tasks::task_manager; friend class tablet_virtual_task; friend class topo::global_topology_request_virtual_task; + friend class vnodes_to_tablets::migration_virtual_task; }; } diff --git a/service/task_manager_module.cc b/service/task_manager_module.cc index 6428c44622..02c04fd402 100644 --- a/service/task_manager_module.cc +++ b/service/task_manager_module.cc @@ -15,6 +15,7 @@ #include "service/topology_state_machine.hh" #include "tasks/task_handler.hh" #include "tasks/virtual_task_hint.hh" +#include "utils/UUID_gen.hh" #include namespace service { @@ -469,5 +470,183 @@ task_manager_module::task_manager_module(tasks::task_manager& tm) noexcept } +namespace vnodes_to_tablets { + +tasks::task_id migration_virtual_task::make_task_id(const sstring& keyspace) { + // Prefix the keyspace name with a unique identifier for this task to avoid + // collisions with other named UUIDs that may be added in the future. + auto uuid = utils::UUID_gen::get_name_UUID("vnodes_to_tablets_migration:" + keyspace); + return tasks::task_id{uuid}; +} + +std::optional migration_virtual_task::find_keyspace_for_task_id(tasks::task_id id) const { + auto& db = _ss._db.local(); + // Note: We could use a cache here, but it's probably an overkill. + // Scylla supports up to 1000 keyspaces, which is a relatively small number + // for UUID computation. + for (const auto& ks_name : db.get_all_keyspaces()) { + if (make_task_id(ks_name) == id) { + return ks_name; + } + } + return std::nullopt; +} + +tasks::task_manager::task_group migration_virtual_task::get_group() const noexcept { + return tasks::task_manager::task_group::vnodes_to_tablets_migration_group; +} + +future> migration_virtual_task::contains(tasks::task_id task_id) const { + if (!task_id.uuid().is_name_based()) { + // Task id of migration task is always a named UUID. + co_return std::nullopt; + } + auto ks_name = find_keyspace_for_task_id(task_id); + if (!ks_name) { + co_return std::nullopt; + } + auto status = _ss.get_tablets_migration_status(*ks_name); + if (status != storage_service::migration_status::migrating_to_tablets) { + co_return std::nullopt; + } + co_return tasks::virtual_task_hint{.keyspace_name = *ks_name}; +} + +tasks::task_stats migration_virtual_task::make_task_stats(tasks::task_id id, const sstring& keyspace) { + return tasks::task_stats{ + .task_id = id, + .type = "vnodes_to_tablets_migration", + .kind = tasks::task_kind::cluster, + .scope = "keyspace", + .state = tasks::task_manager::task_state::running, + .sequence_number = 0, + .keyspace = keyspace, + .table = "", + .entity = "", + .shard = 0, + .start_time = {}, + .end_time = {}, + }; +} + +tasks::task_status migration_virtual_task::make_task_status(tasks::task_id id, const sstring& keyspace, + tasks::task_manager::task_state state, + tasks::task_manager::task::progress progress) { + auto stats = make_task_stats(id, keyspace); + return tasks::task_status{ + .task_id = stats.task_id, + .type = stats.type, + .kind = stats.kind, + .scope = stats.scope, + .state = state, + .is_abortable = tasks::is_abortable::no, + .start_time = stats.start_time, + .end_time = stats.end_time, + .parent_id = tasks::task_id::create_null_id(), + .sequence_number = stats.sequence_number, + .shard = stats.shard, + .keyspace = stats.keyspace, + .table = stats.table, + .entity = stats.entity, + .progress_units = "nodes", + .progress = progress, + }; +} + +future> migration_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) { + auto& ks_name = hint.keyspace_name; + if (!ks_name) { + co_return std::nullopt; + } + storage_service::keyspace_migration_status status; + try { + status = co_await _ss.get_tablets_migration_status_with_node_details(*ks_name); + } catch (const replica::no_such_keyspace&) { + co_return std::nullopt; + } + if (status.status != storage_service::migration_status::migrating_to_tablets) { + co_return std::nullopt; + } + + // The progress tracks the number of nodes currently using tablets. + // During forward migration it increases; during rollback it decreases. + double nodes_upgraded = 0; + double total_nodes = status.nodes.size(); + for (const auto& node : status.nodes) { + if (node.current_mode == "tablets") { + nodes_upgraded++; + } + } + + auto task_state = tasks::task_manager::task_state::running; + auto task_progress = tasks::task_manager::task::progress{nodes_upgraded, total_nodes}; + + co_return make_task_status(id, *ks_name, task_state, task_progress); +} + +future> migration_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) { + auto& ks_name = hint.keyspace_name; + if (!ks_name) { + co_return std::nullopt; + } + + storage_service::migration_status status; + while (true) { + try { + status = _ss.get_tablets_migration_status(*ks_name); + } catch (const replica::no_such_keyspace&) { + co_return std::nullopt; + } + if (status != storage_service::migration_status::migrating_to_tablets) { + break; + } + co_await _ss._topology_state_machine.event.wait(); + } + + bool migration_succeeded = status == storage_service::migration_status::tablets; + auto state = migration_succeeded ? tasks::task_manager::task_state::done : tasks::task_manager::task_state::suspended; + double total_nodes = _ss._topology_state_machine._topology.normal_nodes.size(); + auto task_progress = tasks::task_manager::task::progress{migration_succeeded ? total_nodes : 0, total_nodes}; + + auto task_status = make_task_status(id, *ks_name, state, task_progress); + task_status.end_time = db_clock::now(); + + co_return task_status; +} + +future<> migration_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept { + // Vnodes-to-tablets migration cannot be aborted via the task manager. + // It requires a manual rollback procedure. + return make_ready_future<>(); +} + +future> migration_virtual_task::get_stats() { + std::vector result; + auto& db = _ss._db.local(); + for (const auto& ks_name : db.get_all_keyspaces()) { + storage_service::migration_status status; + try { + status = _ss.get_tablets_migration_status(ks_name); + } catch (const replica::no_such_keyspace&) { + continue; + } + if (status != storage_service::migration_status::migrating_to_tablets) { + continue; + } + result.push_back(make_task_stats(make_task_id(ks_name), ks_name)); + } + co_return result; +} + +task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept + : tasks::task_manager::module(tm, "vnodes_to_tablets_migration") + , _ss(ss) +{} + +std::set task_manager_module::get_nodes() const { + return get_task_manager().get_nodes(_ss); +} + +} } diff --git a/service/task_manager_module.hh b/service/task_manager_module.hh index 20443a7d87..103f73cfe7 100644 --- a/service/task_manager_module.hh +++ b/service/task_manager_module.hh @@ -83,4 +83,42 @@ public: } +namespace vnodes_to_tablets { + +class migration_virtual_task : public tasks::task_manager::virtual_task::impl { +private: + service::storage_service& _ss; +public: + migration_virtual_task(tasks::task_manager::module_ptr module, + service::storage_service& ss) + : tasks::task_manager::virtual_task::impl(std::move(module)) + , _ss(ss) + {} + virtual tasks::task_manager::task_group get_group() const noexcept override; + virtual future> contains(tasks::task_id task_id) const override; + + virtual future> get_status(tasks::task_id id, tasks::virtual_task_hint hint) override; + virtual future> wait(tasks::task_id id, tasks::virtual_task_hint hint) override; + virtual future<> abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept override; + virtual future> get_stats() override; +private: + static tasks::task_id make_task_id(const sstring& keyspace); + std::optional find_keyspace_for_task_id(tasks::task_id id) const; + static tasks::task_stats make_task_stats(tasks::task_id id, const sstring& keyspace); + static tasks::task_status make_task_status(tasks::task_id id, const sstring& keyspace, + tasks::task_manager::task_state state, + tasks::task_manager::task::progress progress); +}; + +class task_manager_module : public tasks::task_manager::module { +private: + service::storage_service& _ss; +public: + task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept; + + std::set get_nodes() const override; +}; + +} + } diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index 9c1591202f..bca6854e7a 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -114,6 +114,7 @@ public: topology_change_group, tablets_group, global_topology_change_group, + vnodes_to_tablets_migration_group, }; class task : public enable_lw_shared_from_this { diff --git a/tasks/virtual_task_hint.hh b/tasks/virtual_task_hint.hh index ebe6427319..2a23b13fa2 100644 --- a/tasks/virtual_task_hint.hh +++ b/tasks/virtual_task_hint.hh @@ -18,6 +18,7 @@ struct virtual_task_hint { std::optional task_type; std::optional tablet_id; std::optional node_id; + std::optional keyspace_name; locator::tablet_task_type get_task_type() const; locator::tablet_id get_tablet_id() const; From a3aa4f6cb479d043ad8143c41740775a208c0401 Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Tue, 31 Mar 2026 22:06:01 +0300 Subject: [PATCH 7/9] distributed_loader: Make table_populator aware of migration rollbacks The `table_populator` uses a `migrate_to_tablets` flag to distinguish normal tables from tables under vnodes-to-tablets migration (forward path), since the two require different resharding. The next patch will set the parent info of migration-related resharding compaction tasks so they appear as children of the migration virtual task. For that, the table populator needs to recognize not only migrations in the forward path, but rollbacks as well. Replace the flag with a tri-state `migration_direction` enum (none, forward, rollback). Signed-off-by: Nikos Dragazis --- replica/distributed_loader.cc | 42 +++++++++++++++++++++++++---------- replica/distributed_loader.hh | 3 ++- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 87ce94ecd4..6d4162f027 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -279,21 +279,29 @@ distributed_loader::get_sstables_from(sharded& db, sstring ks } class table_populator { +public: + enum class migration_direction { + none, // no migration + forward, // vnodes -> tablets + rollback // tablets -> vnodes (rollback) + }; + +private: sharded& _db; sstring _ks; sstring _cf; global_table_ptr& _global_table; std::vector>> _sstable_directories; sstables::sstable_version_types _version_for_reshaping = sstables::oldest_writable_sstable_format; - bool _migrate_to_tablets = false; + migration_direction _migration_direction; public: - table_populator(global_table_ptr& ptr, sharded& db, sstring ks, sstring cf, bool migrate_to_tablets = false) + table_populator(global_table_ptr& ptr, sharded& db, sstring ks, sstring cf, migration_direction md = migration_direction::none) : _db(db) , _ks(std::move(ks)) , _cf(std::move(cf)) , _global_table(ptr) - , _migrate_to_tablets(migrate_to_tablets) + , _migration_direction(md) { } @@ -396,10 +404,12 @@ sstables::shared_sstable make_sstable(replica::table& table, sstables::sstable_s future<> table_populator::populate_subdir(sharded& directory) { auto state = directory.local().state(); - dblog.debug("Populating {}/{}/{} state={} resharding_mode={}", _ks, _cf, _global_table->get_storage_options(), state, _migrate_to_tablets ? "vnodes-to-tablets" : "normal"); + bool vnodes_resharding = _migration_direction == migration_direction::forward; + dblog.debug("Populating {}/{}/{} state={} resharding_mode={}", _ks, _cf, _global_table->get_storage_options(), state, vnodes_resharding ? "vnodes-to-tablets" : "normal"); compaction::owned_ranges_ptr owned_ranges_ptr = nullptr; - if (_migrate_to_tablets) { + + if (vnodes_resharding) { // Build owned_ranges from the tablet map. auto table_uuid = _global_table->schema()->id(); auto& tmap = _db.local().get_shared_token_metadata().get()->tablets().get_tablet_map(table_uuid); @@ -417,7 +427,7 @@ future<> table_populator::populate_subdir(sharded& }).get(); return make_sstable(*_global_table, state, gen, _version_for_reshaping); - }, owned_ranges_ptr, _migrate_to_tablets); + }, owned_ranges_ptr, vnodes_resharding); // The node is offline at this point so we are very lenient with what we consider // offstrategy. @@ -449,7 +459,8 @@ future<> table_populator::populate_subdir(sharded& } future<> distributed_loader::populate_keyspace(sharded& db, - sharded& sys_ks, keyspace& ks, sstring ks_name) + sharded& sys_ks, keyspace& ks, sstring ks_name, + std::optional storage_mode) { dblog.info("Populating Keyspace {}", ks_name); @@ -461,12 +472,19 @@ future<> distributed_loader::populate_keyspace(sharded& db, dblog.info("Keyspace {}: Reading CF {} id={} version={} storage={}", ks_name, cfname, uuid, s->version(), cf.get_storage_options()); - bool migrating_to_tablets = cf.uses_tablets() && !ks.uses_tablets(); - if (migrating_to_tablets) { - dblog.info("Keyspace {}: CF {} is in vnodes-to-tablets migration mode", ks_name, cfname); + using md = table_populator::migration_direction; + auto& tablet_metadata = db.local().get_shared_token_metadata().get()->tablets(); + auto direction = [&]() { + if (ks.uses_tablets()) { return md::none; } + if (cf.uses_tablets()) { return md::forward; } + if (tablet_metadata.has_tablet_map(uuid) && storage_mode == service::intended_storage_mode::vnodes) { return md::rollback; } + return md::none; + }(); + if (direction != md::none) { + dblog.info("Keyspace {}: CF {} is in vnodes-to-tablets migration mode (direction: {})", ks_name, cfname, direction == md::forward ? "forward" : "rollback"); } - auto metadata = table_populator(gtable, db, ks_name, cfname, migrating_to_tablets); + auto metadata = table_populator(gtable, db, ks_name, cfname, direction); std::exception_ptr ex; try { @@ -583,7 +601,7 @@ future<> distributed_loader::init_non_system_keyspaces(sharded make_sstables_available(sstables::sstable_directory& dir, sharded& db, sharded& vb, sharded& vbw, db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf); - static future<> populate_keyspace(sharded& db, sharded& sys_ks, keyspace& ks, sstring ks_name); + static future<> populate_keyspace(sharded& db, sharded& sys_ks, keyspace& ks, sstring ks_name, + std::optional storage_mode = std::nullopt); static future>>> get_sstables_from(sharded& db, sstring ks, sstring cf, sstables::sstable_open_config cfg, noncopyable_function(global_table_ptr&, sharded&)> start_dir); From 295e43478112b3d27e10c11d6998f01374d8b10a Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Tue, 31 Mar 2026 21:48:54 +0300 Subject: [PATCH 8/9] distributed_loader: Link resharding tasks to migration virtual task When a table is loaded on startup during a vnodes-to-tablets migration (forward or rollback), the `table_populator` runs a resharding compaction. Set the migration virtual task as parent of the resharding task. This enables users to easily find all node-local resharding tasks related to a particular migration. Make `migration_virtual_task::make_task_id()` public so that the `distributed_loader` can compute the migration's task ID. Signed-off-by: Nikos Dragazis --- replica/distributed_loader.cc | 12 +++++++++--- replica/distributed_loader.hh | 2 +- service/task_manager_module.cc | 5 +++++ service/task_manager_module.hh | 2 +- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 6d4162f027..54e7919363 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -23,6 +23,7 @@ #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" #include "db/schema_tables.hh" +#include "service/task_manager_module.hh" #include "compaction/compaction_manager.hh" #include "compaction/task_manager_module.hh" #include "sstables/sstables.hh" @@ -101,9 +102,9 @@ distributed_loader::lock_table(global_table_ptr& table, sharded -distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding) { +distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding, tasks::task_info parent_info) { auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); - auto task = co_await compaction_module.make_and_start_task({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding); + auto task = co_await compaction_module.make_and_start_task(parent_info, std::move(ks_name), std::move(table_name), parent_info.id, dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding); co_await task->done(); } @@ -407,6 +408,7 @@ future<> table_populator::populate_subdir(sharded& bool vnodes_resharding = _migration_direction == migration_direction::forward; dblog.debug("Populating {}/{}/{} state={} resharding_mode={}", _ks, _cf, _global_table->get_storage_options(), state, vnodes_resharding ? "vnodes-to-tablets" : "normal"); + tasks::task_info parent_info; compaction::owned_ranges_ptr owned_ranges_ptr = nullptr; if (vnodes_resharding) { @@ -421,13 +423,17 @@ future<> table_populator::populate_subdir(sharded& owned_ranges_ptr = compaction::make_owned_ranges_ptr(std::move(ranges)); } + if (_migration_direction != migration_direction::none) { + parent_info = tasks::task_info(service::vnodes_to_tablets::migration_virtual_task::make_task_id(_ks), 0); + } + co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, state] (shard_id shard) mutable { auto gen = smp::submit_to(shard, [this] () { return _global_table->calculate_generation_for_new_table(); }).get(); return make_sstable(*_global_table, state, gen, _version_for_reshaping); - }, owned_ranges_ptr, vnodes_resharding); + }, owned_ranges_ptr, vnodes_resharding, parent_info); // The node is offline at this point so we are very lenient with what we consider // offstrategy. diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index 947cb87a32..24759b183a 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -70,7 +70,7 @@ class distributed_loader { static future<> reshape(sharded& dir, sharded& db, compaction::reshape_mode mode, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, std::function filter); static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, - compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false); + compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false, tasks::task_info parent_info = {}); static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(global_table_ptr&, sharded& dir); static future make_sstables_available(sstables::sstable_directory& dir, diff --git a/service/task_manager_module.cc b/service/task_manager_module.cc index 02c04fd402..bcce8059c0 100644 --- a/service/task_manager_module.cc +++ b/service/task_manager_module.cc @@ -581,6 +581,11 @@ future> migration_virtual_task::get_status(tas auto task_state = tasks::task_manager::task_state::running; auto task_progress = tasks::task_manager::task::progress{nodes_upgraded, total_nodes}; + // Note: Children are left empty. Although the resharding tasks are children + // of the migration task, using get_children() here would be pointless + // because resharding runs during startup before start_listen() is called, + // so the RPC fan-out to collect children from all nodes (including self) + // would fail. co_return make_task_status(id, *ks_name, task_state, task_progress); } diff --git a/service/task_manager_module.hh b/service/task_manager_module.hh index 103f73cfe7..7eb9b57a8c 100644 --- a/service/task_manager_module.hh +++ b/service/task_manager_module.hh @@ -101,8 +101,8 @@ public: virtual future> wait(tasks::task_id id, tasks::virtual_task_hint hint) override; virtual future<> abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept override; virtual future> get_stats() override; -private: static tasks::task_id make_task_id(const sstring& keyspace); +private: std::optional find_keyspace_for_task_id(tasks::task_id id) const; static tasks::task_stats make_task_stats(tasks::task_id id, const sstring& keyspace); static tasks::task_status make_task_status(tasks::task_id id, const sstring& keyspace, From d361a0dd83ea8c0632a2e041460cdb54efcc2605 Mon Sep 17 00:00:00 2001 From: Nikos Dragazis Date: Thu, 26 Mar 2026 13:02:04 +0200 Subject: [PATCH 9/9] test: cluster: Verify vnodes-to-tablets migration virtual task Signed-off-by: Nikos Dragazis --- .../test_vnodes_to_tablets_migration.py | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/test/cluster/test_vnodes_to_tablets_migration.py b/test/cluster/test_vnodes_to_tablets_migration.py index 93dbe5e09e..4ff464283b 100644 --- a/test/cluster/test_vnodes_to_tablets_migration.py +++ b/test/cluster/test_vnodes_to_tablets_migration.py @@ -17,6 +17,7 @@ from test.pylib.rest_client import HTTPError, read_barrier from test.pylib.internal_types import ServerInfo from test.pylib.manager_client import ManagerClient from test.cluster.util import new_test_keyspace, reconnect_driver +from test.cluster.tasks.task_manager_client import TaskManagerClient logger = logging.getLogger(__name__) @@ -110,11 +111,31 @@ async def verify_migration_status(manager: ManagerClient, server: ServerInfo, expected_node_statuses: dict[str, tuple[str, str]], retries: int = 0, retry_interval: float = 0): async def _check(): + # Verify migration status via the migration status API status = await manager.api.get_vnode_tablet_migration_status(server.ip_addr, ks) actual_node_statuses = {n['host_id']: (n['current_mode'], n['intended_mode']) for n in status['nodes']} assert status['status'] == expected_status, f"Expected migration status '{expected_status}', got '{status['status']}'" assert actual_node_statuses == expected_node_statuses, f"Expected node statuses {expected_node_statuses}, got {actual_node_statuses}" + # Verify migration status via the tasks API + tm = TaskManagerClient(manager.api) + tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration") + ks_tasks = [t for t in tasks if t.keyspace == ks] + + if expected_status == "migrating_to_tablets": + assert len(ks_tasks) == 1, f"Expected 1 virtual task for keyspace '{ks}', got {len(ks_tasks)}" + task = ks_tasks[0] + assert task.state == "running", f"Expected task state 'running', got '{task.state}'" + assert task.type == "vnodes_to_tablets_migration", f"Expected task type 'vnodes_to_tablets_migration', got '{task.type}'" + + task_status = await tm.get_task_status(server.ip_addr, task.task_id) + expected_total = len(expected_node_statuses) + expected_completed = sum(1 for cm, _ in expected_node_statuses.values() if cm == 'tablets') + assert task_status.progress_total == expected_total, f"Expected progress_total {expected_total}, got {task_status.progress_total}" + assert task_status.progress_completed == expected_completed, f"Expected progress_completed {expected_completed}, got {task_status.progress_completed}" + else: + assert len(ks_tasks) == 0, f"Expected no virtual tasks for keyspace '{ks}' when status is '{expected_status}', got {len(ks_tasks)}" + for attempt in range(retries + 1): try: await _check() @@ -632,6 +653,96 @@ async def test_migration_finalize_before_upgrade(manager: ManagerClient): await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks) +@pytest.mark.asyncio +async def test_migration_task_not_abortable(manager: ManagerClient): + """Verify that aborting a vnodes-to-tablets migration task via the task manager fails.""" + server, cql = await setup_single_node(manager) + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.t (pk int PRIMARY KEY)") + await manager.api.create_vnode_tablet_migration(server.ip_addr, ks) + + tm = TaskManagerClient(manager.api) + tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration") + ks_tasks = [t for t in tasks if t.keyspace == ks] + assert len(ks_tasks) == 1, f"Expected 1 migration task for keyspace '{ks}', got {len(ks_tasks)}" + + task = ks_tasks[0] + status = await tm.get_task_status(server.ip_addr, task.task_id) + assert status.is_abortable is False, f"Expected task to be non-abortable, got is_abortable={status.is_abortable}" + + with pytest.raises(HTTPError, match="cannot be aborted"): + await tm.abort_task(server.ip_addr, task.task_id) + + # Rollback: schema changes are not yet supported for migrating keyspaces. + # TODO: Remove this once support is added. + await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks) + + +@pytest.mark.asyncio +async def test_migration_wait_task(manager: ManagerClient): + """Verify that the task manager "wait" API works for vnodes-to-tablets migration tasks. + + Exercises two scenarios: + 1. Wait on a migration task that is rolled back — expect "suspended" state. + 2. Wait on a migration task that completes successfully — expect "done" state. + """ + server, cql = await setup_single_node(manager) + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.t (pk int PRIMARY KEY)") + + tm = TaskManagerClient(manager.api) + + # Scenario 1: wait + rollback + + logger.info("Starting migration") + await manager.api.create_vnode_tablet_migration(server.ip_addr, ks) + + tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration") + assert len(tasks) == 1 + assert tasks[0].keyspace == ks + task_id = tasks[0].task_id + + logger.info("Starting wait on the migration task") + wait_task = asyncio.create_task(tm.wait_for_task(server.ip_addr, task_id)) + + logger.info("Rolling back migration") + await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks) + + logger.info("Expecting wait to finish with 'suspended' state") + wait_status = await wait_task + assert wait_status.state == "suspended", f"Expected 'suspended' after rollback, got '{wait_status.state}'" + assert wait_status.progress_completed == 0, f"Expected 0 upgraded nods for rolled back migration, got {wait_status.progress_completed}" + + # Scenario 2: wait + full migration + + logger.info("Starting migration again") + await manager.api.create_vnode_tablet_migration(server.ip_addr, ks) + + tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration") + assert len(tasks) == 1 + assert tasks[0].keyspace == ks + task_id = tasks[0].task_id + + logger.info("Marking the node for tablets migration and restarting") + await manager.api.upgrade_node_to_tablets(server.ip_addr) + await manager.server_restart(server.server_id) + await reconnect_driver(manager) + cql, _ = await manager.get_ready_cql([server]) + + logger.info("Starting wait on the migration task") + wait_task = asyncio.create_task(tm.wait_for_task(server.ip_addr, task_id)) + + logger.info("Finalizing migration") + await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks) + + logger.info("Expecting wait to finish with 'done' state") + wait_status = await wait_task + assert wait_status.state == "done", f"Expected 'done' after finalization, got '{wait_status.state}'" + assert wait_status.progress_completed == 1, f"Expected 1 upgraded node for completed migration, got {wait_status.progress_completed}" + + @pytest.mark.asyncio async def test_migration_multiple_keyspaces(manager: ManagerClient): """Verify that two keyspaces can be migrated from vnodes to tablets simultaneously."""