raft topology: implement global request handling

The only possible request for now is creating a new CDC generation.
This commit is contained in:
Kamil Braun
2023-04-25 16:58:53 +02:00
parent 2bd333dd84
commit a09ed01ffa

View File

@@ -918,6 +918,29 @@ class topology_coordinator {
co_return std::pair{gen_uuid, std::move(gen_mutations)};
}
// Precondition: there is no node request and no ongoing topology transition
// (checked under the guard we're holding).
future<> handle_global_request(group0_guard guard) {
switch (_topo_sm._topology.global_request.value()) {
case global_topology_request::new_cdc_generation: {
slogger.info("raft topology: new CDC generation requested");
auto tmptr = get_token_metadata_ptr();
auto [gen_uuid, gen_mutations] = co_await prepare_new_cdc_generation_data(tmptr, guard, std::nullopt);
std::vector<canonical_mutation> updates{gen_mutations.begin(), gen_mutations.end()};
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
.set_new_cdc_generation_data_uuid(gen_uuid);
updates.push_back(builder.build());
auto reason = ::format(
"insert CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard), {std::move(updates)}, reason);
}
break;
}
}
// Returns `true` iff there was work to do.
future<bool> handle_topology_transition(group0_guard guard) {
auto tstate = _topo_sm._topology.tstate;
@@ -927,6 +950,12 @@ class topology_coordinator {
co_await handle_node_transition(std::move(*node));
co_return true;
}
guard = std::get<group0_guard>(std::move(node_or_guard));
if (_topo_sm._topology.global_request) {
co_await handle_global_request(std::move(guard));
co_return true;
}
co_return false;
}