topology_coordinator: small start_cleanup refactoring

Rename start_cleanup -> start_vnodes_cleanup for clarity.
Pass topology_request and server_id in start_vnodes_cleanup, we will
need them for better logging later.
This commit is contained in:
Petr Gusev
2025-09-27 14:26:20 +02:00
parent d53e24812f
commit 1b791dacde
3 changed files with 24 additions and 14 deletions

View File

@@ -211,8 +211,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
std::unordered_set<raft::server_id> dead_nodes;
};
struct start_cleanup {
// Request to start a cluster-wide cleanup of vnodes-based tables on nodes marked
// as cleanup_need. This step is required before removenode or decommission
// topology operations.
struct start_vnodes_cleanup {
group0_guard guard;
topology_request request;
raft::server_id request_server_id;
};
// Return dead nodes
@@ -259,9 +264,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// Returns:
// guard - there is nothing to do.
// cancel_requests - no request can be started so cancel the queue
// start_cleanup - cleanup needs to be started
// start_vnodes_cleanup - cleanup needs to be started
// node_to_work_on - the node the topology coordinator should work on
std::variant<group0_guard, cancel_requests, start_cleanup, node_to_work_on> get_next_task(group0_guard guard) {
std::variant<group0_guard, cancel_requests, start_vnodes_cleanup, node_to_work_on> get_next_task(group0_guard guard) {
auto& topo = _topo_sm._topology;
if (topo.transition_nodes.size() != 0) {
@@ -313,7 +318,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
if (cleanup_needed && (req == topology_request::remove || req == topology_request::leave)) {
// If the highest prio request is removenode or decommission we need to start cleanup if one is needed
return start_cleanup(std::move(guard));
return start_vnodes_cleanup(std::move(guard), req, id);
}
return node_to_work_on(std::move(guard), &topo, id, &topo.find(id)->second, req, get_request_param(id));
@@ -2039,7 +2044,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
return std::make_pair(true, std::move(cancel->guard));
}
if (auto* cleanup = std::get_if<start_cleanup>(&work)) {
if (auto* cleanup = std::get_if<start_vnodes_cleanup>(&work)) {
// cleanup has to be started
return std::make_pair(true, std::move(cleanup->guard));
}
@@ -2144,8 +2149,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_return true;
}
if (auto* cleanup = std::get_if<start_cleanup>(&work)) {
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), utils::UUID{});
if (auto* cleanup = std::get_if<start_vnodes_cleanup>(&work)) {
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard),
std::pair(cleanup->request, cleanup->request_server_id));
co_return true;
}
@@ -3068,23 +3074,26 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
return muts;
}
future<> start_cleanup_on_dirty_nodes(group0_guard guard, utils::UUID global_request_id) {
future<> start_cleanup_on_dirty_nodes(group0_guard guard, std::variant<std::pair<topology_request, raft::server_id>, utils::UUID> cmd) {
auto& topo = _topo_sm._topology;
utils::chunked_vector<canonical_mutation> muts;
muts.reserve(topo.normal_nodes.size() + size_t(bool(global_request_id)));
if (global_request_id) {
if (const auto* global_request_id = std::get_if<utils::UUID>(&cmd)) {
muts.reserve(topo.normal_nodes.size() + 2);
topology_mutation_builder builder(guard.write_timestamp());
builder.del_global_topology_request();
if (_feature_service.topology_global_request_queue) {
topology_request_tracking_mutation_builder rtbuilder(global_request_id);
topology_request_tracking_mutation_builder rtbuilder(*global_request_id);
builder.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, global_request_id);
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, *global_request_id);
rtbuilder.done();
muts.emplace_back(rtbuilder.build());
}
muts.emplace_back(builder.build());
} else {
muts.reserve(topo.normal_nodes.size());
}
for (auto& [id, rs] : topo.normal_nodes) {
if (rs.cleanup == cleanup_status::needed) {
topology_mutation_builder builder(guard.write_timestamp());
@@ -3093,6 +3102,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.debug("mark node {} as cleanup running", id);
}
}
if (!muts.empty()) {
co_await update_topology_state(std::move(guard), std::move(muts), "Starting cleanup");
}

View File

@@ -248,7 +248,7 @@ topology_mutation_builder& topology_mutation_builder::queue_global_topology_requ
return apply_set("global_requests", collection_apply_mode::update, std::vector<data_value>{value});
}
topology_mutation_builder& topology_mutation_builder::drop_first_global_topology_request_id(const std::vector<utils::UUID>& values, utils::UUID& id) {
topology_mutation_builder& topology_mutation_builder::drop_first_global_topology_request_id(const std::vector<utils::UUID>& values, const utils::UUID& id) {
if (!values.empty() && values[0] == id) {
return apply_set("global_requests", collection_apply_mode::overwrite, std::span(values.begin() + 1, values.size() - 1));
} else {

View File

@@ -128,7 +128,7 @@ public:
topology_mutation_builder& del_global_topology_request();
topology_mutation_builder& del_global_topology_request_id();
topology_mutation_builder& queue_global_topology_request_id(const utils::UUID& value);
topology_mutation_builder& drop_first_global_topology_request_id(const std::vector<utils::UUID>&, utils::UUID&);
topology_mutation_builder& drop_first_global_topology_request_id(const std::vector<utils::UUID>&, const utils::UUID&);
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};