From 1b791dacdefa46be37ca5c021db919ae455da84e Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Sat, 27 Sep 2025 14:26:20 +0200 Subject: [PATCH] topology_coordinator: small start_cleanup refactoring Rename start_cleanup -> start_vnodes_cleanup for clarity. Pass topology_request and server_id in start_vnodes_cleanup, we will need them for better logging later. --- service/topology_coordinator.cc | 34 +++++++++++++++++++++------------ service/topology_mutation.cc | 2 +- service/topology_mutation.hh | 2 +- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index bdf081bb06..4a6217fe6c 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -211,8 +211,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { std::unordered_set dead_nodes; }; - struct start_cleanup { + // Request to start a cluster-wide cleanup of vnodes-based tables on nodes marked + // as cleanup_need. This step is required before removenode or decommission + // topology operations. + struct start_vnodes_cleanup { group0_guard guard; + topology_request request; + raft::server_id request_server_id; }; // Return dead nodes @@ -259,9 +264,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Returns: // guard - there is nothing to do. // cancel_requests - no request can be started so cancel the queue - // start_cleanup - cleanup needs to be started + // start_vnodes_cleanup - cleanup needs to be started // node_to_work_on - the node the topology coordinator should work on - std::variant get_next_task(group0_guard guard) { + std::variant get_next_task(group0_guard guard) { auto& topo = _topo_sm._topology; if (topo.transition_nodes.size() != 0) { @@ -313,7 +318,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { if (cleanup_needed && (req == topology_request::remove || req == topology_request::leave)) { // If the highest prio request is removenode or decommission we need to start cleanup if one is needed - return start_cleanup(std::move(guard)); + return start_vnodes_cleanup(std::move(guard), req, id); } return node_to_work_on(std::move(guard), &topo, id, &topo.find(id)->second, req, get_request_param(id)); @@ -2039,7 +2044,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { return std::make_pair(true, std::move(cancel->guard)); } - if (auto* cleanup = std::get_if(&work)) { + if (auto* cleanup = std::get_if(&work)) { // cleanup has to be started return std::make_pair(true, std::move(cleanup->guard)); } @@ -2144,8 +2149,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_return true; } - if (auto* cleanup = std::get_if(&work)) { - co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), utils::UUID{}); + if (auto* cleanup = std::get_if(&work)) { + co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), + std::pair(cleanup->request, cleanup->request_server_id)); co_return true; } @@ -3068,23 +3074,26 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { return muts; } - future<> start_cleanup_on_dirty_nodes(group0_guard guard, utils::UUID global_request_id) { + future<> start_cleanup_on_dirty_nodes(group0_guard guard, std::variant, utils::UUID> cmd) { auto& topo = _topo_sm._topology; utils::chunked_vector muts; - muts.reserve(topo.normal_nodes.size() + size_t(bool(global_request_id))); - if (global_request_id) { + if (const auto* global_request_id = std::get_if(&cmd)) { + muts.reserve(topo.normal_nodes.size() + 2); topology_mutation_builder builder(guard.write_timestamp()); builder.del_global_topology_request(); if (_feature_service.topology_global_request_queue) { - topology_request_tracking_mutation_builder rtbuilder(global_request_id); + topology_request_tracking_mutation_builder rtbuilder(*global_request_id); builder.del_global_topology_request_id() - .drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, global_request_id); + .drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, *global_request_id); rtbuilder.done(); muts.emplace_back(rtbuilder.build()); } muts.emplace_back(builder.build()); + } else { + muts.reserve(topo.normal_nodes.size()); } + for (auto& [id, rs] : topo.normal_nodes) { if (rs.cleanup == cleanup_status::needed) { topology_mutation_builder builder(guard.write_timestamp()); @@ -3093,6 +3102,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtlogger.debug("mark node {} as cleanup running", id); } } + if (!muts.empty()) { co_await update_topology_state(std::move(guard), std::move(muts), "Starting cleanup"); } diff --git a/service/topology_mutation.cc b/service/topology_mutation.cc index cb1f7bd0cc..7d234f48cf 100644 --- a/service/topology_mutation.cc +++ b/service/topology_mutation.cc @@ -248,7 +248,7 @@ topology_mutation_builder& topology_mutation_builder::queue_global_topology_requ return apply_set("global_requests", collection_apply_mode::update, std::vector{value}); } -topology_mutation_builder& topology_mutation_builder::drop_first_global_topology_request_id(const std::vector& values, utils::UUID& id) { +topology_mutation_builder& topology_mutation_builder::drop_first_global_topology_request_id(const std::vector& values, const utils::UUID& id) { if (!values.empty() && values[0] == id) { return apply_set("global_requests", collection_apply_mode::overwrite, std::span(values.begin() + 1, values.size() - 1)); } else { diff --git a/service/topology_mutation.hh b/service/topology_mutation.hh index cc1b651aff..b4b0cc4092 100644 --- a/service/topology_mutation.hh +++ b/service/topology_mutation.hh @@ -128,7 +128,7 @@ public: topology_mutation_builder& del_global_topology_request(); topology_mutation_builder& del_global_topology_request_id(); topology_mutation_builder& queue_global_topology_request_id(const utils::UUID& value); - topology_mutation_builder& drop_first_global_topology_request_id(const std::vector&, utils::UUID&); + topology_mutation_builder& drop_first_global_topology_request_id(const std::vector&, const utils::UUID&); topology_node_mutation_builder& with_node(raft::server_id); canonical_mutation build() { return canonical_mutation{std::move(_m)}; } };