From dd83666733303c2ee5fabb04708d01fa0842440e Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 28 Jan 2026 12:34:10 +0100 Subject: [PATCH] service: rearrange keyspace_rf_change handler In the following changes, keyspace_rf_change handler will also consider a change of RF by more than one. Rearrange the handler, so that it first chooses a kind of RF change and then creates relevant updates. Do not wrap the code in schedule_migration function, as we no longer need a quick return possibility. --- service/topology_coordinator.cc | 98 +++++++++++++++++++++------------ 1 file changed, 64 insertions(+), 34 deletions(-) diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 89f604abb9..187af58f34 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -960,6 +960,40 @@ class topology_coordinator : public endpoint_lifecycle_subscriber } } + enum class keyspace_rf_change_kind { + default_rf_change, + conversion_to_rack_list, + }; + + future choose_keyspace_rf_change_kind(utils::UUID req_id, + lw_shared_ptr old_ks_md, + lw_shared_ptr new_ks_md, + const std::vector& tables_with_mvs) { + const auto& new_replication_strategy_config = new_ks_md->strategy_options(); + const auto& old_replication_strategy_config = old_ks_md->strategy_options(); + auto check_needs_colocation = [&] () -> future { + bool rack_list_conversion = false; + for (const auto& [dc, rf_value] : new_replication_strategy_config) { + if (std::holds_alternative(rf_value)) { + auto it = old_replication_strategy_config.find(dc); + if (it != old_replication_strategy_config.end() && std::holds_alternative(it->second)) { + rack_list_conversion = true; + break; + } + } + } + co_return rack_list_conversion ? co_await requires_rack_list_colocation(_db, get_token_metadata_ptr(), &_sys_ks, req_id) : false; + }; + + if (tables_with_mvs.empty()) { + co_return keyspace_rf_change_kind::default_rf_change; + } + if (co_await check_needs_colocation()) { + co_return keyspace_rf_change_kind::conversion_to_rack_list; + } + co_return keyspace_rf_change_kind::default_rf_change; + } + // Precondition: there is no node request and no ongoing topology transition // (checked under the guard we're holding). future<> handle_global_request(group0_guard guard) { @@ -1015,9 +1049,18 @@ class topology_coordinator : public endpoint_lifecycle_subscriber saved_ks_props = *req_entry.new_keyspace_rf_change_data; } + auto tbuilder_with_request_drop = [&] () { + topology_mutation_builder tbuilder(guard.write_timestamp()); + tbuilder.set_transition_state(topology::transition_state::tablet_migration) + .set_version(_topo_sm._topology.version + 1) + .del_global_topology_request() + .del_global_topology_request_id() + .drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id); + return tbuilder; + }; + utils::chunked_vector updates; sstring error; - bool needs_colocation = false; if (_db.has_keyspace(ks_name)) { try { auto& ks = _db.find_keyspace(ks_name); @@ -1029,34 +1072,18 @@ class topology_coordinator : public endpoint_lifecycle_subscriber size_t unimportant_init_tablet_count = 2; // must be a power of 2 locator::tablet_map new_tablet_map{unimportant_init_tablet_count}; - auto schedule_migrations = [&] () -> future<> { auto tables_with_mvs = ks.metadata()->tables(); auto views = ks.metadata()->views(); tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end()); + auto rf_change_kind = co_await choose_keyspace_rf_change_kind(req_id, ks.metadata(), ks_md, tables_with_mvs); + switch (rf_change_kind) { + case keyspace_rf_change_kind::default_rf_change: { if (!tables_with_mvs.empty()) { auto table = tables_with_mvs.front(); auto tablet_count = tmptr->tablets().get_tablet_map(table->id()).tablet_count(); locator::replication_strategy_params params{ks_md->strategy_options(), tablet_count, ks.metadata()->consistency_option()}; auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params, tmptr->get_topology()); - auto check_needs_colocation = [&] () -> future { - const auto& new_replication_strategy_config = new_strategy->get_config_options(); - const auto& old_replication_strategy_config = ks.metadata()->strategy_options(); - bool rack_list_conversion = false; - for (const auto& [dc, rf_value] : new_replication_strategy_config) { - if (std::holds_alternative(rf_value)) { - auto it = old_replication_strategy_config.find(dc); - if (it != old_replication_strategy_config.end() && std::holds_alternative(it->second)) { - rack_list_conversion = true; - break; - } - } - } - co_return rack_list_conversion ? co_await requires_rack_list_colocation(_db, tmptr, &_sys_ks, req_id) : false; - }; - if (needs_colocation = co_await check_needs_colocation(); needs_colocation) { - co_return; - } for (const auto& table_or_mv : tables_with_mvs) { if (!tmptr->tablets().is_base_table(table_or_mv->id())) { // Apply the transition only on base tables. @@ -1103,8 +1130,21 @@ class topology_coordinator : public endpoint_lifecycle_subscriber for (auto& m: schema_muts) { updates.emplace_back(m); } - }; - co_await schedule_migrations(); + + updates.push_back(canonical_mutation(tbuilder_with_request_drop().build())); + updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id) + .done() + .build())); + break; + } + case keyspace_rf_change_kind::conversion_to_rack_list: { + rtlogger.info("keyspace_rf_change for keyspace {} postponed for colocation", ks_name); + topology_mutation_builder tbuilder = tbuilder_with_request_drop(); + tbuilder.pause_rf_change_request(req_id); + updates.push_back(canonical_mutation(tbuilder.build())); + break; + } + } } catch (const std::exception& e) { error = e.what(); rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, desired new ks opts: {}, error: {}", @@ -1115,22 +1155,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist"; } - bool pause_request = needs_colocation && error.empty(); - topology_mutation_builder tbuilder(guard.write_timestamp()); - tbuilder.set_transition_state(topology::transition_state::tablet_migration) - .set_version(_topo_sm._topology.version + 1) - .del_global_topology_request() - .del_global_topology_request_id() - .drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id); - if (pause_request) { - rtlogger.info("keyspace_rf_change for keyspace {} postponed for colocation", ks_name); - tbuilder.pause_rf_change_request(req_id); - } else { + if (error != "") { + updates.push_back(canonical_mutation(tbuilder_with_request_drop().build())); updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id) .done(error) .build())); } - updates.push_back(canonical_mutation(tbuilder.build())); sstring reason = seastar::format("ALTER tablets KEYSPACE called with options: {}", saved_ks_props); rtlogger.trace("do update {} reason {}", updates, reason);