diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 68962bd227..6f946f0796 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -797,6 +797,21 @@ } ] }, + { + "path":"/storage_service/cleanup_all", + "operations":[ + { + "method":"POST", + "summary":"Trigger a global cluster cleanup trough the topology coordinator", + "type":"long", + "nickname":"cleanup_all", + "produces":[ + "application/json" + ], + "parameters":[] + } + ] + }, { "path":"/storage_service/keyspace_offstrategy_compaction/{keyspace}", "operations":[ diff --git a/api/storage_service.cc b/api/storage_service.cc index dee4cc2214..9e10bca40c 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -787,6 +787,16 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) -> future { + co_await ss.invoke_on(0, [] (service::storage_service& ss) { + if (!ss.is_topology_coordinator_enabled()) { + throw std::runtime_error("Cannot run cleanup through the topology coordinator because it is disabled"); + } + return ss.do_cluster_cleanup(); + }); + co_return json::json_return_type(0); + }); + ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector table_infos) -> future { apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos); bool res = false; @@ -1491,6 +1501,7 @@ void unset_storage_service(http_context& ctx, routes& r) { ss::force_compaction.unset(r); ss::force_keyspace_compaction.unset(r); ss::force_keyspace_cleanup.unset(r); + ss::cleanup_all.unset(r); ss::perform_keyspace_offstrategy_compaction.unset(r); ss::upgrade_sstables.unset(r); ss::force_flush.unset(r); diff --git a/service/storage_service.cc b/service/storage_service.cc index ea454fb68a..eec64432d3 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1777,6 +1777,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { "insert CDC generation data (UUID: {})", gen_uuid); co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason); } + break; + case global_topology_request::cleanup: + co_await start_cleanup_on_dirty_nodes(std::move(guard), true); break; } } @@ -2262,7 +2265,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } if (auto* cleanup = std::get_if(&work)) { - co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard)); + co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), false); co_return true; } @@ -3005,11 +3008,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { return muts; } - future<> start_cleanup_on_dirty_nodes(group0_guard guard) { + future<> start_cleanup_on_dirty_nodes(group0_guard guard, bool global_request) { auto& topo = _topo_sm._topology; std::vector muts; - muts.reserve(topo.normal_nodes.size()); + muts.reserve(topo.normal_nodes.size() + size_t(global_request)); + if (global_request) { + topology_mutation_builder builder(guard.write_timestamp()); + builder.del_global_topology_request(); + muts.emplace_back(builder.build()); + } for (auto& [id, rs] : topo.normal_nodes) { if (rs.cleanup == cleanup_status::needed) { topology_mutation_builder builder(guard.write_timestamp()); @@ -4778,6 +4786,10 @@ future<> storage_service::uninit_address_map() { return _gossiper.unregister_(_raft_ip_address_updater); } +bool storage_service::is_topology_coordinator_enabled() const { + return _raft_topology_change_enabled; +} + future<> storage_service::join_cluster(sharded& sys_dist_ks, sharded& proxy) { assert(this_shard_id() == 0); @@ -6323,6 +6335,55 @@ future<> storage_service::do_drain() { co_await _repair.invoke_on_all(&repair_service::shutdown); } +future<> storage_service::do_cluster_cleanup() { + auto& raft_server = _group0->group0_server(); + + while (true) { + auto guard = co_await _group0->client().start_operation(&_group0_as); + + auto curr_req = _topology_state_machine._topology.global_request; + if (curr_req && *curr_req != global_topology_request::cleanup) { + // FIXME: replace this with a queue + throw std::runtime_error{ + "topology coordinator: cluster cleanup: a different topology request is already pending, try again later"}; + } + + + auto it = _topology_state_machine._topology.find(raft_server.id()); + if (!it) { + throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id())); + } + + const auto& rs = it->second; + + if (rs.state != node_state::normal) { + throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state)); + } + + slogger.info("raft topology: cluster cleanup requested"); + topology_mutation_builder builder(guard.write_timestamp()); + builder.set_global_topology_request(global_topology_request::cleanup); + topology_change change{{builder.build()}}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup: cluster cleanup requested")); + + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); + } catch (group0_concurrent_modification&) { + slogger.info("raft topology: cleanup: concurrent operation is detected, retrying."); + continue; + } + break; + } + + // Wait cleanup finishes on all nodes + co_await _topology_state_machine.event.when([this] { + return std::all_of(_topology_state_machine._topology.normal_nodes.begin(), _topology_state_machine._topology.normal_nodes.end(), [] (auto& n) { + return n.second.cleanup == cleanup_status::clean; + }); + }); + slogger.info("raft topology: cluster cleanup done"); +} + future<> storage_service::raft_rebuild(sstring source_dc) { auto& raft_server = _group0->group0_server(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 380db87978..b29de90d21 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -339,6 +339,7 @@ public: future<> init_address_map(raft_address_map& address_map); future<> uninit_address_map(); + bool is_topology_coordinator_enabled() const; future<> drain_on_shutdown(); @@ -780,6 +781,7 @@ public: // Public for `reload_raft_topology_state` REST API. future<> topology_transition(); + future<> do_cluster_cleanup(); public: future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst); future<> set_tablet_balancing_enabled(bool); diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index d9c0b6e9eb..b361d0b0e4 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -177,6 +177,7 @@ topology_request topology_request_from_string(const sstring& s) { static std::unordered_map global_topology_request_to_name_map = { {global_topology_request::new_cdc_generation, "new_cdc_generation"}, + {global_topology_request::cleanup, "cleanup"}, }; std::ostream& operator<<(std::ostream& os, const global_topology_request& req) { diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 381a45d72d..699fa549a3 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -78,6 +78,7 @@ using request_param = std::variant