diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 6d6624999a..92377b0b76 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -324,6 +324,7 @@ schema_ptr system_keyspace::topology_requests() { .with_column("snapshot_tag", utf8_type) .with_column("snapshot_expiry", timestamp_type) .with_column("snapshot_skip_flush", boolean_type) + .with_column("finalize_migration_ks_name", utf8_type) .set_comment("Topology request tracking") .with_hash_version() .build(); @@ -3512,6 +3513,9 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t entry.snapshot_expiry = row.get_as("snapshot_expiry"); } } + if (row.has("finalize_migration_ks_name")) { + entry.finalize_migration_ks_name = row.get_as("finalize_migration_ks_name"); + } return entry; } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index ffd89d93b7..67a32fba95 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -427,6 +427,7 @@ public: std::optional snapshot_tag; std::optional snapshot_expiry; bool snapshot_skip_flush; + std::optional finalize_migration_ks_name; }; using topology_requests_entries = std::unordered_map; diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 459236aca9..d60a3fd8a9 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1169,6 +1169,164 @@ class topology_coordinator : public endpoint_lifecycle_subscriber co_await update_topology_state(std::move(guard), {builder.build()}, "SNAPSHOT TABLES requested"); } break; + case global_topology_request::finalize_migration: { + rtlogger.info("finalize_migration requested"); + + auto ks_name = *req_entry.finalize_migration_ks_name; + utils::chunked_vector updates; + sstring error; + + if (_db.has_keyspace(ks_name)) { + try { + auto& ks = _db.find_keyspace(ks_name); + if (ks.uses_tablets()) { + throw std::runtime_error(fmt::format("Keyspace '{}' already uses tablets", ks_name)); + } + + auto tmptr = get_token_metadata_ptr(); + const auto& tablet_metadata = tmptr->tablets(); + auto tables = ks.metadata()->tables(); + + // Verify all tables have tablet maps. + for (const auto& schema : tables) { + if (!tablet_metadata.has_tablet_map(schema->id())) { + throw std::runtime_error(fmt::format( + "Table {}.{} does not have a tablet map", ks_name, schema->cf_name())); + } + } + + // Find the migration direction (tablets or rollback to vnodes). + // Nodes that haven't set their intended mode are treated as vnodes (the default). + std::optional global_intended_mode; + for (const auto& [server_id, replica_state] : _topo_sm._topology.normal_nodes) { + auto replica_intended_mode = replica_state.storage_mode ? *replica_state.storage_mode : intended_storage_mode::vnodes; + if (!global_intended_mode) { + global_intended_mode = replica_intended_mode; + } else if (replica_intended_mode != *global_intended_mode) { + throw std::runtime_error(fmt::format( + "Cannot finalize migration for keyspace '{}': node {} has intended storage mode '{}', expected '{}'", + ks_name, server_id, replica_intended_mode, *global_intended_mode)); + } + } + if (!global_intended_mode) { + on_internal_error(rtlogger, fmt::format( + "finalize_migration: no normal nodes found while finalizing migration for keyspace '{}'", ks_name)); + } + bool rollback = *global_intended_mode == intended_storage_mode::vnodes; + + rtlogger.info("Finalizing migration for keyspace '{}': direction={}", + ks_name, rollback ? "rollback to vnodes" : "forward to tablets"); + + co_await _tablet_load_stats_refresh.trigger(); + + // Verify that the actual storage mode matches the intended mode for all normal nodes. + // A node that has migrated a table's storage to tablets will report it in its load_stats. + for (const auto& [node_id, _] : _topo_sm._topology.normal_nodes) { + auto host_id = to_host_id(node_id); + auto it = _load_stats_per_node.find(host_id); + if (!rollback) { // forward path (vnodes to tablets) + if (it == _load_stats_per_node.end()) { + throw std::runtime_error(fmt::format( + "No load stats available for node {}", host_id)); + } + const auto& node_stats = it->second; + for (const auto& schema : tables) { + if (!node_stats.tables.contains(schema->id())) { + throw std::runtime_error(fmt::format( + "Node {} has not yet migrated table {}.{} to tablets", + host_id, ks_name, schema->cf_name())); + } + } + } else { // rollback path (tablets to vnodes) + if (it != _load_stats_per_node.end()) { + const auto& node_stats = it->second; + for (const auto& schema : tables) { + if (node_stats.tables.contains(schema->id())) { + throw std::runtime_error(fmt::format( + "Node {} still reports table {}.{} as tablet-based, rollback not complete", + host_id, ks_name, schema->cf_name())); + } + } + } + } + } + + if (!rollback) { + // All nodes have been migrated. ALTER the keyspace to use tablets. + auto old_md = ks.metadata(); + auto new_md = data_dictionary::keyspace_metadata::new_keyspace( + old_md->name(), + old_md->strategy_name(), + old_md->strategy_options(), + std::optional(0), // initial_tablets=0 means auto + old_md->consistency_option(), + old_md->durable_writes(), + old_md->get_storage_options()); + auto schema_muts = prepare_keyspace_update_announcement(_db, new_md, guard.write_timestamp()); + for (auto& m : schema_muts) { + updates.emplace_back(m); + } + } else { + // Rollback: delete tablet maps for all tables in the keyspace. + for (const auto& schema : tables) { + updates.emplace_back(replica::make_drop_tablet_map_mutation(schema->id(), guard.write_timestamp())); + } + } + } catch (const std::exception& e) { + error = e.what(); + rtlogger.error("Couldn't process global_topology_request::finalize_migration for keyspace '{}': {}", + ks_name, std::current_exception()); + updates.clear(); + } + } else { + error = fmt::format("Keyspace '{}' does not exist", ks_name); + } + + topology_mutation_builder tbuilder(guard.write_timestamp()); + tbuilder.del_global_topology_request() + .del_global_topology_request_id() + .drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id); + + if (error.empty()) { + // Only clear intended_storage_mode if no other keyspace is still under migration. + auto tmptr = get_token_metadata_ptr(); + const auto& tmd = tmptr->tablets(); + bool has_other_migrating_ks = false; + for (const auto& other_ks_name : _db.get_non_system_keyspaces()) { + if (other_ks_name == ks_name) { + continue; + } + auto& other_ks = _db.find_keyspace(other_ks_name); + if (other_ks.uses_tablets()) { + continue; + } + bool other_ks_has_tablet_map = std::ranges::any_of(other_ks.metadata()->tables(), [&](const auto& s) { + return tmd.has_tablet_map(s->id()); + }); + if (other_ks_has_tablet_map) { + has_other_migrating_ks = true; + break; + } + } + if (!has_other_migrating_ks) { + for (const auto& [node_id, _] : _topo_sm._topology.normal_nodes) { + tbuilder.with_node(node_id).del("intended_storage_mode"); + } + } + } + + updates.push_back(canonical_mutation( + topology_request_tracking_mutation_builder(req_id) + .done(error) + .build())); + updates.push_back(canonical_mutation(tbuilder.build())); + + sstring reason = fmt::format("finalize vnode-to-tablet migration for keyspace '{}'", ks_name); + mixed_change change{std::move(updates)}; + group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); + co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); + } + break; } } diff --git a/service/topology_mutation.cc b/service/topology_mutation.cc index 8ac8d40ad1..41192492bb 100644 --- a/service/topology_mutation.cc +++ b/service/topology_mutation.cc @@ -385,6 +385,12 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b return *this; } +topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_finalize_migration_data( + const sstring& ks_name) { + apply_atomic("finalize_migration_ks_name", ks_name); + return *this; +} + template class topology_mutation_builder_base; template class topology_mutation_builder_base; diff --git a/service/topology_mutation.hh b/service/topology_mutation.hh index f7ad373bf6..f9564bbd83 100644 --- a/service/topology_mutation.hh +++ b/service/topology_mutation.hh @@ -160,6 +160,7 @@ public: topology_request_tracking_mutation_builder& set_truncate_table_data(const table_id& table_id); topology_request_tracking_mutation_builder& set_new_keyspace_rf_change_data(const sstring& ks_name, const std::map& rf_per_dc); topology_request_tracking_mutation_builder& set_snapshot_tables_data(const std::unordered_set&, const sstring& tag, bool); + topology_request_tracking_mutation_builder& set_finalize_migration_data(const sstring& ks_name); canonical_mutation build() { return canonical_mutation{std::move(_m)}; } }; diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 1d9fef362c..92db15cf09 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -210,6 +210,7 @@ static std::unordered_map global_topology_requ {global_topology_request::truncate_table, "truncate_table"}, {global_topology_request::snapshot_tables, "snapshot_tables"}, {global_topology_request::noop_request, "noop_request"}, + {global_topology_request::finalize_migration, "finalize_migration"}, }; global_topology_request global_topology_request_from_string(const sstring& s) { diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index c27433d31a..69422fc7d0 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -88,6 +88,7 @@ enum class global_topology_request: uint16_t { // Ensures that all later requests and tablet scheduler will see prior updates to group0. noop_request, snapshot_tables, + finalize_migration, }; struct ring_slice {