Merge 'Add virtual task for vnodes-to-tablets migrations' from Nikos Dragazis

This PR exposes vnodes-to-tablets migrations through the task manager API via a virtual task. This allows users to list, query status, and wait on ongoing migrations through a standard interface, consistent with other global operations such as tablet operations and topology requests are already exposed.

The virtual task exposes all migrations that are currently in progress. Each migrating keyspace appears as a separate task, identified by a deterministic name-based (v3) UUID derived from the keyspace name. Progress is reported as the number of nodes that have switched to tablets vs. the total. The number increases on the forward path and decreases on rollback.

The task is not abortable - rolling back a migration requires a manual procedure.

The `wait` API blocks until the migration either completes (returning `done`) or is rolled back (returning `suspended`).

Example output:
```
$ scylla nodetool tasks list vnodes_to_tablets_migration
task_id                              type                        kind    scope    state   sequence_number keyspace table entity shard start_time end_time
1747b573-6cd6-312d-abb1-9b66c1c2d81f vnodes_to_tablets_migration cluster keyspace running 0               ks                    0

$ scylla nodetool tasks status 1747b573-6cd6-312d-abb1-9b66c1c2d81f
id: 1747b573-6cd6-312d-abb1-9b66c1c2d81f
type: vnodes_to_tablets_migration
kind: cluster
scope: keyspace
state: running
is_abortable: false
start_time:
end_time:
error:
parent_id: none
sequence_number: 0
shard: 0
keyspace: ks
table:
entity:
progress_units: nodes
progress_total: 3
progress_completed: 0
```

Fixes SCYLLADB-1150.

New feature, no backport needed.

Closes scylladb/scylladb#29256

* github.com:scylladb/scylladb:
  test: cluster: Verify vnodes-to-tablets migration virtual task
  distributed_loader: Link resharding tasks to migration virtual task
  distributed_loader: Make table_populator aware of migration rollbacks
  service: Add virtual task for vnodes-to-tablets migrations
  storage_service: Guard migration status against uninitialized group0
  compaction: Add parent_id to table_resharding_compaction_task_impl
  storage_service: Add keyspace-level migration status function
  storage_service: Replace migration status string with enum
  utils: Add UUID::is_name_based()
This commit is contained in:
Avi Kivity
2026-04-19 00:56:33 +03:00
13 changed files with 453 additions and 35 deletions

View File

@@ -1743,11 +1743,11 @@ rest_get_vnode_tablet_migration(http_context& ctx, sharded<service::storage_serv
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
}
auto keyspace = validate_keyspace(ctx, req);
auto status = co_await ss.local().get_tablets_migration_status(keyspace);
auto status = co_await ss.local().get_tablets_migration_status_with_node_details(keyspace);
ss::vnode_tablet_migration_status result;
result.keyspace = status.keyspace;
result.status = status.status;
result.status = fmt::format("{}", status.status);
result.nodes._set = true;
for (const auto& node : status.nodes) {
ss::vnode_tablet_migration_node_status n;

View File

@@ -698,12 +698,13 @@ public:
table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
std::string table,
tasks::task_id parent_id,
sharded<sstables::sstable_directory>& dir,
sharded<replica::database>& db,
compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr,
bool vnodes_resharding) noexcept
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id())
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), parent_id ? 0 : module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", parent_id)
, _dir(dir)
, _db(db)
, _creator(std::move(creator))

View File

@@ -23,6 +23,7 @@
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/schema_tables.hh"
#include "service/task_manager_module.hh"
#include "compaction/compaction_manager.hh"
#include "compaction/task_manager_module.hh"
#include "sstables/sstables.hh"
@@ -101,9 +102,9 @@ distributed_loader::lock_table(global_table_ptr& table, sharded<sstables::sstabl
// - The second part calls each shard's distributed object to reshard the SSTables they were
// assigned.
future<>
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding) {
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding, tasks::task_info parent_info) {
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::table_resharding_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding);
auto task = co_await compaction_module.make_and_start_task<compaction::table_resharding_compaction_task_impl>(parent_info, std::move(ks_name), std::move(table_name), parent_info.id, dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding);
co_await task->done();
}
@@ -279,21 +280,29 @@ distributed_loader::get_sstables_from(sharded<replica::database>& db, sstring ks
}
class table_populator {
public:
enum class migration_direction {
none, // no migration
forward, // vnodes -> tablets
rollback // tablets -> vnodes (rollback)
};
private:
sharded<replica::database>& _db;
sstring _ks;
sstring _cf;
global_table_ptr& _global_table;
std::vector<lw_shared_ptr<sharded<sstables::sstable_directory>>> _sstable_directories;
sstables::sstable_version_types _version_for_reshaping = sstables::oldest_writable_sstable_format;
bool _migrate_to_tablets = false;
migration_direction _migration_direction;
public:
table_populator(global_table_ptr& ptr, sharded<replica::database>& db, sstring ks, sstring cf, bool migrate_to_tablets = false)
table_populator(global_table_ptr& ptr, sharded<replica::database>& db, sstring ks, sstring cf, migration_direction md = migration_direction::none)
: _db(db)
, _ks(std::move(ks))
, _cf(std::move(cf))
, _global_table(ptr)
, _migrate_to_tablets(migrate_to_tablets)
, _migration_direction(md)
{
}
@@ -396,10 +405,13 @@ sstables::shared_sstable make_sstable(replica::table& table, sstables::sstable_s
future<> table_populator::populate_subdir(sharded<sstables::sstable_directory>& directory) {
auto state = directory.local().state();
dblog.debug("Populating {}/{}/{} state={} resharding_mode={}", _ks, _cf, _global_table->get_storage_options(), state, _migrate_to_tablets ? "vnodes-to-tablets" : "normal");
bool vnodes_resharding = _migration_direction == migration_direction::forward;
dblog.debug("Populating {}/{}/{} state={} resharding_mode={}", _ks, _cf, _global_table->get_storage_options(), state, vnodes_resharding ? "vnodes-to-tablets" : "normal");
tasks::task_info parent_info;
compaction::owned_ranges_ptr owned_ranges_ptr = nullptr;
if (_migrate_to_tablets) {
if (vnodes_resharding) {
// Build owned_ranges from the tablet map.
auto table_uuid = _global_table->schema()->id();
auto& tmap = _db.local().get_shared_token_metadata().get()->tablets().get_tablet_map(table_uuid);
@@ -411,13 +423,17 @@ future<> table_populator::populate_subdir(sharded<sstables::sstable_directory>&
owned_ranges_ptr = compaction::make_owned_ranges_ptr(std::move(ranges));
}
if (_migration_direction != migration_direction::none) {
parent_info = tasks::task_info(service::vnodes_to_tablets::migration_virtual_task::make_task_id(_ks), 0);
}
co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, state] (shard_id shard) mutable {
auto gen = smp::submit_to(shard, [this] () {
return _global_table->calculate_generation_for_new_table();
}).get();
return make_sstable(*_global_table, state, gen, _version_for_reshaping);
}, owned_ranges_ptr, _migrate_to_tablets);
}, owned_ranges_ptr, vnodes_resharding, parent_info);
// The node is offline at this point so we are very lenient with what we consider
// offstrategy.
@@ -449,7 +465,8 @@ future<> table_populator::populate_subdir(sharded<sstables::sstable_directory>&
}
future<> distributed_loader::populate_keyspace(sharded<replica::database>& db,
sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name)
sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name,
std::optional<service::intended_storage_mode> storage_mode)
{
dblog.info("Populating Keyspace {}", ks_name);
@@ -461,12 +478,19 @@ future<> distributed_loader::populate_keyspace(sharded<replica::database>& db,
dblog.info("Keyspace {}: Reading CF {} id={} version={} storage={}", ks_name, cfname, uuid, s->version(), cf.get_storage_options());
bool migrating_to_tablets = cf.uses_tablets() && !ks.uses_tablets();
if (migrating_to_tablets) {
dblog.info("Keyspace {}: CF {} is in vnodes-to-tablets migration mode", ks_name, cfname);
using md = table_populator::migration_direction;
auto& tablet_metadata = db.local().get_shared_token_metadata().get()->tablets();
auto direction = [&]() {
if (ks.uses_tablets()) { return md::none; }
if (cf.uses_tablets()) { return md::forward; }
if (tablet_metadata.has_tablet_map(uuid) && storage_mode == service::intended_storage_mode::vnodes) { return md::rollback; }
return md::none;
}();
if (direction != md::none) {
dblog.info("Keyspace {}: CF {} is in vnodes-to-tablets migration mode (direction: {})", ks_name, cfname, direction == md::forward ? "forward" : "rollback");
}
auto metadata = table_populator(gtable, db, ks_name, cfname, migrating_to_tablets);
auto metadata = table_populator(gtable, db, ks_name, cfname, direction);
std::exception_ptr ex;
try {
@@ -583,7 +607,7 @@ future<> distributed_loader::init_non_system_keyspaces(sharded<replica::database
continue;
}
futures.emplace_back(distributed_loader::populate_keyspace(db, sys_ks, ks.second, ks_name));
futures.emplace_back(distributed_loader::populate_keyspace(db, sys_ks, ks.second, ks_name, storage_mode));
}
when_all_succeed(futures.begin(), futures.end()).discard_result().get();

View File

@@ -70,13 +70,14 @@ class distributed_loader {
static future<> reshape(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, compaction::reshape_mode mode,
sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, std::function<bool (const sstables::shared_sstable&)> filter);
static future<> reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false);
compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false, tasks::task_info parent_info = {});
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags);
static future<> lock_table(global_table_ptr&, sharded<sstables::sstable_directory>& dir);
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
sharded<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw,
db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf);
static future<> populate_keyspace(sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name);
static future<> populate_keyspace(sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name,
std::optional<service::intended_storage_mode> storage_mode = std::nullopt);
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from(sharded<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg,
noncopyable_function<future<>(global_table_ptr&, sharded<sstables::sstable_directory>&)> start_dir);

View File

@@ -225,6 +225,7 @@ storage_service::storage_service(abort_source& abort_source,
, _node_ops_module(make_shared<node_ops::task_manager_module>(tm, *this))
, _tablets_module(make_shared<service::task_manager_module>(tm, *this))
, _global_topology_requests_module(make_shared<service::topo::task_manager_module>(tm))
, _vnodes_to_tablets_migration_module(make_shared<service::vnodes_to_tablets::task_manager_module>(tm, *this))
, _address_map(address_map)
, _shared_token_metadata(stm)
, _erm_factory(erm_factory)
@@ -250,10 +251,12 @@ storage_service::storage_service(abort_source& abort_source,
tm.register_module(_node_ops_module->get_name(), _node_ops_module);
tm.register_module(_tablets_module->get_name(), _tablets_module);
tm.register_module(_global_topology_requests_module->get_name(), _global_topology_requests_module);
tm.register_module(_vnodes_to_tablets_migration_module->get_name(), _vnodes_to_tablets_migration_module);
if (this_shard_id() == 0) {
_node_ops_module->make_virtual_task<node_ops::node_ops_virtual_task>(*this);
_tablets_module->make_virtual_task<service::tablet_virtual_task>(*this);
_global_topology_requests_module->make_virtual_task<service::topo::global_topology_request_virtual_task>(*this);
_vnodes_to_tablets_migration_module->make_virtual_task<service::vnodes_to_tablets::migration_virtual_task>(*this);
}
register_metrics();
@@ -2342,6 +2345,7 @@ future<> storage_service::stop() {
co_await _tablets_module->stop();
co_await _node_ops_module->stop();
co_await _global_topology_requests_module->stop();
co_await _vnodes_to_tablets_migration_module->stop();
co_await _async_gate.close();
_tablet_split_monitor_event.signal();
co_await std::move(_tablet_split_monitor);
@@ -4109,22 +4113,12 @@ future<> storage_service::set_node_intended_storage_mode(intended_storage_mode m
slogger.info("Successfully set intended storage mode for node {} to {}", raft_server.id(), mode);
}
future<storage_service::keyspace_migration_status> storage_service::get_tablets_migration_status(const sstring& ks_name) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) {
return ss.get_tablets_migration_status(ks_name);
});
}
storage_service::migration_status storage_service::get_tablets_migration_status(const sstring& ks_name) {
auto& db = _db.local();
auto& ks = db.find_keyspace(ks_name);
keyspace_migration_status result;
result.keyspace = ks_name;
if (ks.uses_tablets()) {
result.status = "tablets";
co_return result;
return migration_status::tablets;
}
const auto& tm = get_token_metadata();
@@ -4138,14 +4132,38 @@ future<storage_service::keyspace_migration_status> storage_service::get_tablets_
});
if (!has_tablet_maps) {
result.status = "vnodes";
return migration_status::vnodes;
}
return migration_status::migrating_to_tablets;
}
future<storage_service::keyspace_migration_status> storage_service::get_tablets_migration_status_with_node_details(const sstring& ks_name) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) {
return ss.get_tablets_migration_status_with_node_details(ks_name);
});
}
keyspace_migration_status result;
result.keyspace = ks_name;
result.status = get_tablets_migration_status(ks_name);
if (result.status != migration_status::migrating_to_tablets) {
co_return result;
}
result.status = "migrating_to_tablets";
// system.tablet_sizes is a group0_virtual_table, so it requires group0 to
// be initialized. If this function is called via the task manager API,
// group0 may not be initialized yet.
if (!_group0 || !_group0->joined_group0()) {
throw std::runtime_error(::format("Cannot fetch node statuses for migrating keyspace '{}': group0 is not yet initialized on this node", ks_name));
}
// Pick one table and query system.tablet_sizes to find which nodes
// report tablet sizes (i.e. have loaded tablet-based ERMs).
auto& ks = _db.local().find_keyspace(ks_name);
auto tables = ks.metadata()->tables();
auto sample_table_id = tables.front()->id();
// FIXME: system.tablet_sizes might return stale data (load stats in the topology coordinator are cached).

View File

@@ -228,6 +228,7 @@ private:
shared_ptr<node_ops::task_manager_module> _node_ops_module;
shared_ptr<service::task_manager_module> _tablets_module;
shared_ptr<service::topo::task_manager_module> _global_topology_requests_module;
shared_ptr<service::vnodes_to_tablets::task_manager_module> _vnodes_to_tablets_migration_module;
gms::gossip_address_map& _address_map;
future<service::tablet_operation_result> do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
@@ -297,13 +298,20 @@ public:
sstring intended_mode; // "vnodes" or "tablets"
};
enum class migration_status {
vnodes,
migrating_to_tablets,
tablets,
};
struct keyspace_migration_status {
sstring keyspace;
sstring status; // "vnodes", "migrating_to_tablets", or "tablets"
migration_status status;
std::vector<node_migration_status> nodes;
};
future<keyspace_migration_status> get_tablets_migration_status(const sstring& ks_name);
migration_status get_tablets_migration_status(const sstring& ks_name);
future<keyspace_migration_status> get_tablets_migration_status_with_node_details(const sstring& ks_name);
future<> set_node_intended_storage_mode(intended_storage_mode mode);
future<> finalize_tablets_migration(const sstring& ks_name);
@@ -1054,6 +1062,7 @@ public:
friend class tasks::task_manager;
friend class tablet_virtual_task;
friend class topo::global_topology_request_virtual_task;
friend class vnodes_to_tablets::migration_virtual_task;
};
}
@@ -1080,3 +1089,18 @@ struct fmt::formatter<service::storage_service::mode> : fmt::formatter<string_vi
return fmt::format_to(ctx.out(), "{}", name);
}
};
template <>
struct fmt::formatter<service::storage_service::migration_status> : fmt::formatter<string_view> {
template <typename FormatContext>
auto format(service::storage_service::migration_status status, FormatContext& ctx) const {
std::string_view name;
using enum service::storage_service::migration_status;
switch (status) {
case vnodes: name = "vnodes"; break;
case migrating_to_tablets: name = "migrating_to_tablets"; break;
case tablets: name = "tablets"; break;
}
return fmt::format_to(ctx.out(), "{}", name);
}
};

View File

@@ -15,6 +15,7 @@
#include "service/topology_state_machine.hh"
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/UUID_gen.hh"
#include <seastar/coroutine/maybe_yield.hh>
namespace service {
@@ -469,5 +470,188 @@ task_manager_module::task_manager_module(tasks::task_manager& tm) noexcept
}
namespace vnodes_to_tablets {
tasks::task_id migration_virtual_task::make_task_id(const sstring& keyspace) {
// Prefix the keyspace name with a unique identifier for this task to avoid
// collisions with other named UUIDs that may be added in the future.
auto uuid = utils::UUID_gen::get_name_UUID("vnodes_to_tablets_migration:" + keyspace);
return tasks::task_id{uuid};
}
std::optional<sstring> migration_virtual_task::find_keyspace_for_task_id(tasks::task_id id) const {
auto& db = _ss._db.local();
// Note: We could use a cache here, but it's probably an overkill.
// Scylla supports up to 1000 keyspaces, which is a relatively small number
// for UUID computation.
for (const auto& ks_name : db.get_all_keyspaces()) {
if (make_task_id(ks_name) == id) {
return ks_name;
}
}
return std::nullopt;
}
tasks::task_manager::task_group migration_virtual_task::get_group() const noexcept {
return tasks::task_manager::task_group::vnodes_to_tablets_migration_group;
}
future<std::optional<tasks::virtual_task_hint>> migration_virtual_task::contains(tasks::task_id task_id) const {
if (!task_id.uuid().is_name_based()) {
// Task id of migration task is always a named UUID.
co_return std::nullopt;
}
auto ks_name = find_keyspace_for_task_id(task_id);
if (!ks_name) {
co_return std::nullopt;
}
auto status = _ss.get_tablets_migration_status(*ks_name);
if (status != storage_service::migration_status::migrating_to_tablets) {
co_return std::nullopt;
}
co_return tasks::virtual_task_hint{.keyspace_name = *ks_name};
}
tasks::task_stats migration_virtual_task::make_task_stats(tasks::task_id id, const sstring& keyspace) {
return tasks::task_stats{
.task_id = id,
.type = "vnodes_to_tablets_migration",
.kind = tasks::task_kind::cluster,
.scope = "keyspace",
.state = tasks::task_manager::task_state::running,
.sequence_number = 0,
.keyspace = keyspace,
.table = "",
.entity = "",
.shard = 0,
.start_time = {},
.end_time = {},
};
}
tasks::task_status migration_virtual_task::make_task_status(tasks::task_id id, const sstring& keyspace,
tasks::task_manager::task_state state,
tasks::task_manager::task::progress progress) {
auto stats = make_task_stats(id, keyspace);
return tasks::task_status{
.task_id = stats.task_id,
.type = stats.type,
.kind = stats.kind,
.scope = stats.scope,
.state = state,
.is_abortable = tasks::is_abortable::no,
.start_time = stats.start_time,
.end_time = stats.end_time,
.parent_id = tasks::task_id::create_null_id(),
.sequence_number = stats.sequence_number,
.shard = stats.shard,
.keyspace = stats.keyspace,
.table = stats.table,
.entity = stats.entity,
.progress_units = "nodes",
.progress = progress,
};
}
future<std::optional<tasks::task_status>> migration_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) {
auto& ks_name = hint.keyspace_name;
if (!ks_name) {
co_return std::nullopt;
}
storage_service::keyspace_migration_status status;
try {
status = co_await _ss.get_tablets_migration_status_with_node_details(*ks_name);
} catch (const replica::no_such_keyspace&) {
co_return std::nullopt;
}
if (status.status != storage_service::migration_status::migrating_to_tablets) {
co_return std::nullopt;
}
// The progress tracks the number of nodes currently using tablets.
// During forward migration it increases; during rollback it decreases.
double nodes_upgraded = 0;
double total_nodes = status.nodes.size();
for (const auto& node : status.nodes) {
if (node.current_mode == "tablets") {
nodes_upgraded++;
}
}
auto task_state = tasks::task_manager::task_state::running;
auto task_progress = tasks::task_manager::task::progress{nodes_upgraded, total_nodes};
// Note: Children are left empty. Although the resharding tasks are children
// of the migration task, using get_children() here would be pointless
// because resharding runs during startup before start_listen() is called,
// so the RPC fan-out to collect children from all nodes (including self)
// would fail.
co_return make_task_status(id, *ks_name, task_state, task_progress);
}
future<std::optional<tasks::task_status>> migration_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) {
auto& ks_name = hint.keyspace_name;
if (!ks_name) {
co_return std::nullopt;
}
storage_service::migration_status status;
while (true) {
try {
status = _ss.get_tablets_migration_status(*ks_name);
} catch (const replica::no_such_keyspace&) {
co_return std::nullopt;
}
if (status != storage_service::migration_status::migrating_to_tablets) {
break;
}
co_await _ss._topology_state_machine.event.wait();
}
bool migration_succeeded = status == storage_service::migration_status::tablets;
auto state = migration_succeeded ? tasks::task_manager::task_state::done : tasks::task_manager::task_state::suspended;
double total_nodes = _ss._topology_state_machine._topology.normal_nodes.size();
auto task_progress = tasks::task_manager::task::progress{migration_succeeded ? total_nodes : 0, total_nodes};
auto task_status = make_task_status(id, *ks_name, state, task_progress);
task_status.end_time = db_clock::now();
co_return task_status;
}
future<> migration_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept {
// Vnodes-to-tablets migration cannot be aborted via the task manager.
// It requires a manual rollback procedure.
return make_ready_future<>();
}
future<std::vector<tasks::task_stats>> migration_virtual_task::get_stats() {
std::vector<tasks::task_stats> result;
auto& db = _ss._db.local();
for (const auto& ks_name : db.get_all_keyspaces()) {
storage_service::migration_status status;
try {
status = _ss.get_tablets_migration_status(ks_name);
} catch (const replica::no_such_keyspace&) {
continue;
}
if (status != storage_service::migration_status::migrating_to_tablets) {
continue;
}
result.push_back(make_task_stats(make_task_id(ks_name), ks_name));
}
co_return result;
}
task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept
: tasks::task_manager::module(tm, "vnodes_to_tablets_migration")
, _ss(ss)
{}
std::set<locator::host_id> task_manager_module::get_nodes() const {
return get_task_manager().get_nodes(_ss);
}
}
}

View File

@@ -83,4 +83,42 @@ public:
}
namespace vnodes_to_tablets {
class migration_virtual_task : public tasks::task_manager::virtual_task::impl {
private:
service::storage_service& _ss;
public:
migration_virtual_task(tasks::task_manager::module_ptr module,
service::storage_service& ss)
: tasks::task_manager::virtual_task::impl(std::move(module))
, _ss(ss)
{}
virtual tasks::task_manager::task_group get_group() const noexcept override;
virtual future<std::optional<tasks::virtual_task_hint>> contains(tasks::task_id task_id) const override;
virtual future<std::optional<tasks::task_status>> get_status(tasks::task_id id, tasks::virtual_task_hint hint) override;
virtual future<std::optional<tasks::task_status>> wait(tasks::task_id id, tasks::virtual_task_hint hint) override;
virtual future<> abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept override;
virtual future<std::vector<tasks::task_stats>> get_stats() override;
static tasks::task_id make_task_id(const sstring& keyspace);
private:
std::optional<sstring> find_keyspace_for_task_id(tasks::task_id id) const;
static tasks::task_stats make_task_stats(tasks::task_id id, const sstring& keyspace);
static tasks::task_status make_task_status(tasks::task_id id, const sstring& keyspace,
tasks::task_manager::task_state state,
tasks::task_manager::task::progress progress);
};
class task_manager_module : public tasks::task_manager::module {
private:
service::storage_service& _ss;
public:
task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept;
std::set<locator::host_id> get_nodes() const override;
};
}
}

View File

@@ -114,6 +114,7 @@ public:
topology_change_group,
tablets_group,
global_topology_change_group,
vnodes_to_tablets_migration_group,
};
class task : public enable_lw_shared_from_this<task> {

View File

@@ -18,6 +18,7 @@ struct virtual_task_hint {
std::optional<locator::tablet_task_type> task_type;
std::optional<locator::tablet_id> tablet_id;
std::optional<locator::host_id> node_id;
std::optional<sstring> keyspace_name;
locator::tablet_task_type get_task_type() const;
locator::tablet_id get_tablet_id() const;

View File

@@ -18,6 +18,17 @@ BOOST_AUTO_TEST_CASE(test_generation_of_name_based_UUID) {
BOOST_REQUIRE_EQUAL(fmt::to_string(uuid), "0290003c-977e-397c-ac3e-fdfdc01d626b");
}
BOOST_AUTO_TEST_CASE(test_is_name_based) {
// Verify that a name-based UUID is identified as such
auto uuid = utils::UUID_gen::get_name_UUID("systembatchlog");
BOOST_CHECK(uuid.is_name_based());
// Verify that other UUID types are not name-based
BOOST_CHECK(!utils::null_uuid().is_name_based());
BOOST_CHECK(!utils::make_random_uuid().is_name_based());
BOOST_CHECK(!utils::UUID_gen::get_time_UUID().is_name_based());
}
using utils::UUID;
BOOST_AUTO_TEST_CASE(test_UUID_comparison) {

View File

@@ -17,6 +17,7 @@ from test.pylib.rest_client import HTTPError, read_barrier
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.cluster.util import new_test_keyspace, reconnect_driver
from test.cluster.tasks.task_manager_client import TaskManagerClient
logger = logging.getLogger(__name__)
@@ -110,11 +111,31 @@ async def verify_migration_status(manager: ManagerClient, server: ServerInfo,
expected_node_statuses: dict[str, tuple[str, str]],
retries: int = 0, retry_interval: float = 0):
async def _check():
# Verify migration status via the migration status API
status = await manager.api.get_vnode_tablet_migration_status(server.ip_addr, ks)
actual_node_statuses = {n['host_id']: (n['current_mode'], n['intended_mode']) for n in status['nodes']}
assert status['status'] == expected_status, f"Expected migration status '{expected_status}', got '{status['status']}'"
assert actual_node_statuses == expected_node_statuses, f"Expected node statuses {expected_node_statuses}, got {actual_node_statuses}"
# Verify migration status via the tasks API
tm = TaskManagerClient(manager.api)
tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration")
ks_tasks = [t for t in tasks if t.keyspace == ks]
if expected_status == "migrating_to_tablets":
assert len(ks_tasks) == 1, f"Expected 1 virtual task for keyspace '{ks}', got {len(ks_tasks)}"
task = ks_tasks[0]
assert task.state == "running", f"Expected task state 'running', got '{task.state}'"
assert task.type == "vnodes_to_tablets_migration", f"Expected task type 'vnodes_to_tablets_migration', got '{task.type}'"
task_status = await tm.get_task_status(server.ip_addr, task.task_id)
expected_total = len(expected_node_statuses)
expected_completed = sum(1 for cm, _ in expected_node_statuses.values() if cm == 'tablets')
assert task_status.progress_total == expected_total, f"Expected progress_total {expected_total}, got {task_status.progress_total}"
assert task_status.progress_completed == expected_completed, f"Expected progress_completed {expected_completed}, got {task_status.progress_completed}"
else:
assert len(ks_tasks) == 0, f"Expected no virtual tasks for keyspace '{ks}' when status is '{expected_status}', got {len(ks_tasks)}"
for attempt in range(retries + 1):
try:
await _check()
@@ -632,6 +653,96 @@ async def test_migration_finalize_before_upgrade(manager: ManagerClient):
await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks)
@pytest.mark.asyncio
async def test_migration_task_not_abortable(manager: ManagerClient):
"""Verify that aborting a vnodes-to-tablets migration task via the task manager fails."""
server, cql = await setup_single_node(manager)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.t (pk int PRIMARY KEY)")
await manager.api.create_vnode_tablet_migration(server.ip_addr, ks)
tm = TaskManagerClient(manager.api)
tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration")
ks_tasks = [t for t in tasks if t.keyspace == ks]
assert len(ks_tasks) == 1, f"Expected 1 migration task for keyspace '{ks}', got {len(ks_tasks)}"
task = ks_tasks[0]
status = await tm.get_task_status(server.ip_addr, task.task_id)
assert status.is_abortable is False, f"Expected task to be non-abortable, got is_abortable={status.is_abortable}"
with pytest.raises(HTTPError, match="cannot be aborted"):
await tm.abort_task(server.ip_addr, task.task_id)
# Rollback: schema changes are not yet supported for migrating keyspaces.
# TODO: Remove this once support is added.
await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks)
@pytest.mark.asyncio
async def test_migration_wait_task(manager: ManagerClient):
"""Verify that the task manager "wait" API works for vnodes-to-tablets migration tasks.
Exercises two scenarios:
1. Wait on a migration task that is rolled back — expect "suspended" state.
2. Wait on a migration task that completes successfully — expect "done" state.
"""
server, cql = await setup_single_node(manager)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.t (pk int PRIMARY KEY)")
tm = TaskManagerClient(manager.api)
# Scenario 1: wait + rollback
logger.info("Starting migration")
await manager.api.create_vnode_tablet_migration(server.ip_addr, ks)
tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration")
assert len(tasks) == 1
assert tasks[0].keyspace == ks
task_id = tasks[0].task_id
logger.info("Starting wait on the migration task")
wait_task = asyncio.create_task(tm.wait_for_task(server.ip_addr, task_id))
logger.info("Rolling back migration")
await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks)
logger.info("Expecting wait to finish with 'suspended' state")
wait_status = await wait_task
assert wait_status.state == "suspended", f"Expected 'suspended' after rollback, got '{wait_status.state}'"
assert wait_status.progress_completed == 0, f"Expected 0 upgraded nods for rolled back migration, got {wait_status.progress_completed}"
# Scenario 2: wait + full migration
logger.info("Starting migration again")
await manager.api.create_vnode_tablet_migration(server.ip_addr, ks)
tasks = await tm.list_tasks(server.ip_addr, "vnodes_to_tablets_migration")
assert len(tasks) == 1
assert tasks[0].keyspace == ks
task_id = tasks[0].task_id
logger.info("Marking the node for tablets migration and restarting")
await manager.api.upgrade_node_to_tablets(server.ip_addr)
await manager.server_restart(server.server_id)
await reconnect_driver(manager)
cql, _ = await manager.get_ready_cql([server])
logger.info("Starting wait on the migration task")
wait_task = asyncio.create_task(tm.wait_for_task(server.ip_addr, task_id))
logger.info("Finalizing migration")
await manager.api.finalize_vnode_tablet_migration(server.ip_addr, ks)
logger.info("Expecting wait to finish with 'done' state")
wait_status = await wait_task
assert wait_status.state == "done", f"Expected 'done' after finalization, got '{wait_status.state}'"
assert wait_status.progress_completed == 1, f"Expected 1 upgraded node for completed migration, got {wait_status.progress_completed}"
@pytest.mark.asyncio
async def test_migration_multiple_keyspaces(manager: ManagerClient):
"""Verify that two keyspaces can be migrated from vnodes to tablets simultaneously."""

View File

@@ -52,6 +52,10 @@ public:
return version() == 1;
}
bool is_name_based() const noexcept {
return version() == 3;
}
int64_t timestamp() const noexcept {
//if (version() != 1) {
// throw new UnsupportedOperationException("Not a time-based UUID");