db: service: add ongoing_rf_changes to system.topology

Following changes, will allow adding or removing all keyspace
replicas in a DC with a single ALTER KEYSPACE. For such operations,
the tablet load balancer needs to schedule rebuilds. To track
which RF change requests require rebuilds, we maintain a vector
of RF changes along with their ongoing rebuild phases.

Add a new ongoing_rf_changes column to system.topology to keep track
of those requests.

In this commit no data is kept in the new column.
This commit is contained in:
Aleksandra Martyniuk
2026-01-13 14:00:36 +01:00
parent 7cdf7d62a2
commit 751af38f2a
5 changed files with 32 additions and 0 deletions

View File

@@ -299,6 +299,7 @@ schema_ptr system_keyspace::topology() {
.with_column("upgrade_state", utf8_type, column_kind::static_column)
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
.with_column("paused_rf_change_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
.with_column("ongoing_rf_changes", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
.set_comment("Current state of topology change machine")
.with_hash_version()
.build();
@@ -3288,6 +3289,12 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
}
}
if (some_row.has("ongoing_rf_changes")) {
for (auto&& v : deserialize_set_column(*topology(), some_row, "ongoing_rf_changes")) {
ret.ongoing_rf_changes.insert(value_cast<utils::UUID>(v));
}
}
if (some_row.has("enabled_features")) {
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
}

View File

@@ -1339,6 +1339,11 @@ future<bool> storage_service::ongoing_rf_change(const group0_guard& guard, sstri
co_return true;
}
}
for (auto request_id : _topology_state_machine._topology.ongoing_rf_changes) {
if (co_await ongoing_ks_rf_change(request_id)) {
co_return true;
}
}
co_return false;
}

View File

@@ -283,6 +283,20 @@ topology_mutation_builder& topology_mutation_builder::resume_rf_change_request(c
}
}
topology_mutation_builder& topology_mutation_builder::start_rf_change_migrations(const utils::UUID& id) {
return apply_set("ongoing_rf_changes", collection_apply_mode::update, std::vector<data_value>{id});
}
topology_mutation_builder& topology_mutation_builder::finish_rf_change_migrations(const std::unordered_set<utils::UUID>& values, const utils::UUID& id) {
if (values.contains(id)) {
auto new_values = values;
new_values.erase(id);
return apply_set("ongoing_rf_changes", collection_apply_mode::overwrite, new_values | std::views::transform([] (const auto& id) { return data_value{id}; }));
} else {
return *this;
}
}
topology_mutation_builder& topology_mutation_builder::set_upgrade_state_done() {
return apply_atomic("upgrade_state", "done");
}

View File

@@ -132,6 +132,8 @@ public:
topology_mutation_builder& drop_first_global_topology_request_id(const std::vector<utils::UUID>&, const utils::UUID&);
topology_mutation_builder& pause_rf_change_request(const utils::UUID&);
topology_mutation_builder& resume_rf_change_request(const std::unordered_set<utils::UUID>&, const utils::UUID&);
topology_mutation_builder& start_rf_change_migrations(const utils::UUID&);
topology_mutation_builder& finish_rf_change_migrations(const std::unordered_set<utils::UUID>&, const utils::UUID&);
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};

View File

@@ -208,6 +208,10 @@ struct topology {
// It may happen during altering from numerical RF to rack list.
std::unordered_set<utils::UUID> paused_rf_change_requests;
// The ids of ongoing RF change requests.
// Here we keep the ids only for rf-changes using rack_lists.
std::unordered_set<utils::UUID> ongoing_rf_changes;
// The IDs of the committed yet unpublished CDC generations sorted by timestamps.
std::vector<cdc::generation_id> unpublished_cdc_generations;