diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 66497a645b..b3d8f14ed3 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -299,6 +299,7 @@ schema_ptr system_keyspace::topology() { .with_column("upgrade_state", utf8_type, column_kind::static_column) .with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column) .with_column("paused_rf_change_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column) + .with_column("ongoing_rf_changes", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column) .set_comment("Current state of topology change machine") .with_hash_version() .build(); @@ -3288,6 +3289,12 @@ future system_keyspace::load_topology_state(const std::unorde } } + if (some_row.has("ongoing_rf_changes")) { + for (auto&& v : deserialize_set_column(*topology(), some_row, "ongoing_rf_changes")) { + ret.ongoing_rf_changes.insert(value_cast(v)); + } + } + if (some_row.has("enabled_features")) { ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features")); } diff --git a/service/storage_service.cc b/service/storage_service.cc index bc13b01773..7de108bb6f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1339,6 +1339,11 @@ future storage_service::ongoing_rf_change(const group0_guard& guard, sstri co_return true; } } + for (auto request_id : _topology_state_machine._topology.ongoing_rf_changes) { + if (co_await ongoing_ks_rf_change(request_id)) { + co_return true; + } + } co_return false; } diff --git a/service/topology_mutation.cc b/service/topology_mutation.cc index 0674921385..92b44258d1 100644 --- a/service/topology_mutation.cc +++ b/service/topology_mutation.cc @@ -283,6 +283,20 @@ topology_mutation_builder& topology_mutation_builder::resume_rf_change_request(c } } +topology_mutation_builder& topology_mutation_builder::start_rf_change_migrations(const utils::UUID& id) { + return apply_set("ongoing_rf_changes", collection_apply_mode::update, std::vector{id}); +} + +topology_mutation_builder& topology_mutation_builder::finish_rf_change_migrations(const std::unordered_set& values, const utils::UUID& id) { + if (values.contains(id)) { + auto new_values = values; + new_values.erase(id); + return apply_set("ongoing_rf_changes", collection_apply_mode::overwrite, new_values | std::views::transform([] (const auto& id) { return data_value{id}; })); + } else { + return *this; + } +} + topology_mutation_builder& topology_mutation_builder::set_upgrade_state_done() { return apply_atomic("upgrade_state", "done"); } diff --git a/service/topology_mutation.hh b/service/topology_mutation.hh index b041a20565..51fe8a3bfd 100644 --- a/service/topology_mutation.hh +++ b/service/topology_mutation.hh @@ -132,6 +132,8 @@ public: topology_mutation_builder& drop_first_global_topology_request_id(const std::vector&, const utils::UUID&); topology_mutation_builder& pause_rf_change_request(const utils::UUID&); topology_mutation_builder& resume_rf_change_request(const std::unordered_set&, const utils::UUID&); + topology_mutation_builder& start_rf_change_migrations(const utils::UUID&); + topology_mutation_builder& finish_rf_change_migrations(const std::unordered_set&, const utils::UUID&); topology_node_mutation_builder& with_node(raft::server_id); canonical_mutation build() { return canonical_mutation{std::move(_m)}; } }; diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index ba8594dfba..bb7387c60c 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -208,6 +208,10 @@ struct topology { // It may happen during altering from numerical RF to rack list. std::unordered_set paused_rf_change_requests; + // The ids of ongoing RF change requests. + // Here we keep the ids only for rf-changes using rack_lists. + std::unordered_set ongoing_rf_changes; + // The IDs of the committed yet unpublished CDC generations sorted by timestamps. std::vector unpublished_cdc_generations;