diff --git a/service/storage_service.cc b/service/storage_service.cc index dcb69279c3..5d99227db9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -918,6 +918,29 @@ class topology_coordinator { co_return std::pair{gen_uuid, std::move(gen_mutations)}; } + // Precondition: there is no node request and no ongoing topology transition + // (checked under the guard we're holding). + future<> handle_global_request(group0_guard guard) { + switch (_topo_sm._topology.global_request.value()) { + case global_topology_request::new_cdc_generation: { + slogger.info("raft topology: new CDC generation requested"); + + auto tmptr = get_token_metadata_ptr(); + auto [gen_uuid, gen_mutations] = co_await prepare_new_cdc_generation_data(tmptr, guard, std::nullopt); + + std::vector updates{gen_mutations.begin(), gen_mutations.end()}; + topology_mutation_builder builder(guard.write_timestamp()); + builder.set_transition_state(topology::transition_state::commit_cdc_generation) + .set_new_cdc_generation_data_uuid(gen_uuid); + updates.push_back(builder.build()); + auto reason = ::format( + "insert CDC generation data (UUID: {})", gen_uuid); + co_await update_topology_state(std::move(guard), {std::move(updates)}, reason); + } + break; + } + } + // Returns `true` iff there was work to do. future handle_topology_transition(group0_guard guard) { auto tstate = _topo_sm._topology.tstate; @@ -927,6 +950,12 @@ class topology_coordinator { co_await handle_node_transition(std::move(*node)); co_return true; } + + guard = std::get(std::move(node_or_guard)); + if (_topo_sm._topology.global_request) { + co_await handle_global_request(std::move(guard)); + co_return true; + } co_return false; }