service: rearrange keyspace_rf_change handler

In the following changes, keyspace_rf_change handler will also consider
a change of RF by more than one. Rearrange the handler, so that it
first chooses a kind of RF change and then creates relevant updates.

Do not wrap the code in schedule_migration function, as we no longer
need a quick return possibility.
This commit is contained in:
Aleksandra Martyniuk
2026-01-28 12:34:10 +01:00
parent 72bb3113ac
commit dd83666733

View File

@@ -960,6 +960,40 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
}
}
enum class keyspace_rf_change_kind {
default_rf_change,
conversion_to_rack_list,
};
future<keyspace_rf_change_kind> choose_keyspace_rf_change_kind(utils::UUID req_id,
lw_shared_ptr<keyspace_metadata> old_ks_md,
lw_shared_ptr<keyspace_metadata> new_ks_md,
const std::vector<schema_ptr>& tables_with_mvs) {
const auto& new_replication_strategy_config = new_ks_md->strategy_options();
const auto& old_replication_strategy_config = old_ks_md->strategy_options();
auto check_needs_colocation = [&] () -> future<bool> {
bool rack_list_conversion = false;
for (const auto& [dc, rf_value] : new_replication_strategy_config) {
if (std::holds_alternative<locator::rack_list>(rf_value)) {
auto it = old_replication_strategy_config.find(dc);
if (it != old_replication_strategy_config.end() && std::holds_alternative<sstring>(it->second)) {
rack_list_conversion = true;
break;
}
}
}
co_return rack_list_conversion ? co_await requires_rack_list_colocation(_db, get_token_metadata_ptr(), &_sys_ks, req_id) : false;
};
if (tables_with_mvs.empty()) {
co_return keyspace_rf_change_kind::default_rf_change;
}
if (co_await check_needs_colocation()) {
co_return keyspace_rf_change_kind::conversion_to_rack_list;
}
co_return keyspace_rf_change_kind::default_rf_change;
}
// Precondition: there is no node request and no ongoing topology transition
// (checked under the guard we're holding).
future<> handle_global_request(group0_guard guard) {
@@ -1015,9 +1049,18 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
saved_ks_props = *req_entry.new_keyspace_rf_change_data;
}
auto tbuilder_with_request_drop = [&] () {
topology_mutation_builder tbuilder(guard.write_timestamp());
tbuilder.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topo_sm._topology.version + 1)
.del_global_topology_request()
.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
return tbuilder;
};
utils::chunked_vector<canonical_mutation> updates;
sstring error;
bool needs_colocation = false;
if (_db.has_keyspace(ks_name)) {
try {
auto& ks = _db.find_keyspace(ks_name);
@@ -1029,34 +1072,18 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
size_t unimportant_init_tablet_count = 2; // must be a power of 2
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};
auto schedule_migrations = [&] () -> future<> {
auto tables_with_mvs = ks.metadata()->tables();
auto views = ks.metadata()->views();
tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end());
auto rf_change_kind = co_await choose_keyspace_rf_change_kind(req_id, ks.metadata(), ks_md, tables_with_mvs);
switch (rf_change_kind) {
case keyspace_rf_change_kind::default_rf_change: {
if (!tables_with_mvs.empty()) {
auto table = tables_with_mvs.front();
auto tablet_count = tmptr->tablets().get_tablet_map(table->id()).tablet_count();
locator::replication_strategy_params params{ks_md->strategy_options(), tablet_count, ks.metadata()->consistency_option()};
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params, tmptr->get_topology());
auto check_needs_colocation = [&] () -> future<bool> {
const auto& new_replication_strategy_config = new_strategy->get_config_options();
const auto& old_replication_strategy_config = ks.metadata()->strategy_options();
bool rack_list_conversion = false;
for (const auto& [dc, rf_value] : new_replication_strategy_config) {
if (std::holds_alternative<locator::rack_list>(rf_value)) {
auto it = old_replication_strategy_config.find(dc);
if (it != old_replication_strategy_config.end() && std::holds_alternative<sstring>(it->second)) {
rack_list_conversion = true;
break;
}
}
}
co_return rack_list_conversion ? co_await requires_rack_list_colocation(_db, tmptr, &_sys_ks, req_id) : false;
};
if (needs_colocation = co_await check_needs_colocation(); needs_colocation) {
co_return;
}
for (const auto& table_or_mv : tables_with_mvs) {
if (!tmptr->tablets().is_base_table(table_or_mv->id())) {
// Apply the transition only on base tables.
@@ -1103,8 +1130,21 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
for (auto& m: schema_muts) {
updates.emplace_back(m);
}
};
co_await schedule_migrations();
updates.push_back(canonical_mutation(tbuilder_with_request_drop().build()));
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
.done()
.build()));
break;
}
case keyspace_rf_change_kind::conversion_to_rack_list: {
rtlogger.info("keyspace_rf_change for keyspace {} postponed for colocation", ks_name);
topology_mutation_builder tbuilder = tbuilder_with_request_drop();
tbuilder.pause_rf_change_request(req_id);
updates.push_back(canonical_mutation(tbuilder.build()));
break;
}
}
} catch (const std::exception& e) {
error = e.what();
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, desired new ks opts: {}, error: {}",
@@ -1115,22 +1155,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist";
}
bool pause_request = needs_colocation && error.empty();
topology_mutation_builder tbuilder(guard.write_timestamp());
tbuilder.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topo_sm._topology.version + 1)
.del_global_topology_request()
.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
if (pause_request) {
rtlogger.info("keyspace_rf_change for keyspace {} postponed for colocation", ks_name);
tbuilder.pause_rf_change_request(req_id);
} else {
if (error != "") {
updates.push_back(canonical_mutation(tbuilder_with_request_drop().build()));
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
.done(error)
.build()));
}
updates.push_back(canonical_mutation(tbuilder.build()));
sstring reason = seastar::format("ALTER tablets KEYSPACE called with options: {}", saved_ks_props);
rtlogger.trace("do update {} reason {}", updates, reason);