storage_service: topology_coordinator: introduce cleanup REST API integrated with the topology coordinator

Introduce new REST API "/storage_service/cleanup_all"
that, when triggered, instructs the topology coordinator to initiate
cluster wide cleanup on all dirty nodes. It is done by introducing new
global command "global_topology_request::cleanup".
This commit is contained in:
Gleb Natapov
2023-12-05 17:18:54 +02:00
parent 0adb3904d8
commit 97ab3f6622
8 changed files with 103 additions and 11 deletions

View File

@@ -797,6 +797,21 @@
}
]
},
{
"path":"/storage_service/cleanup_all",
"operations":[
{
"method":"POST",
"summary":"Trigger a global cluster cleanup trough the topology coordinator",
"type":"long",
"nickname":"cleanup_all",
"produces":[
"application/json"
],
"parameters":[]
}
]
},
{
"path":"/storage_service/keyspace_offstrategy_compaction/{keyspace}",
"operations":[

View File

@@ -787,6 +787,16 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
co_return json::json_return_type(0);
});
ss::cleanup_all.set(r, [&ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
if (!ss.is_topology_coordinator_enabled()) {
throw std::runtime_error("Cannot run cleanup through the topology coordinator because it is disabled");
}
return ss.do_cluster_cleanup();
});
co_return json::json_return_type(0);
});
ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
bool res = false;
@@ -1491,6 +1501,7 @@ void unset_storage_service(http_context& ctx, routes& r) {
ss::force_compaction.unset(r);
ss::force_keyspace_compaction.unset(r);
ss::force_keyspace_cleanup.unset(r);
ss::cleanup_all.unset(r);
ss::perform_keyspace_offstrategy_compaction.unset(r);
ss::upgrade_sstables.unset(r);
ss::force_flush.unset(r);

View File

@@ -1777,6 +1777,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
"insert CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
}
break;
case global_topology_request::cleanup:
co_await start_cleanup_on_dirty_nodes(std::move(guard), true);
break;
}
}
@@ -2262,7 +2265,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
if (auto* cleanup = std::get_if<start_cleanup>(&work)) {
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard));
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), false);
co_return true;
}
@@ -3005,11 +3008,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
return muts;
}
future<> start_cleanup_on_dirty_nodes(group0_guard guard) {
future<> start_cleanup_on_dirty_nodes(group0_guard guard, bool global_request) {
auto& topo = _topo_sm._topology;
std::vector<canonical_mutation> muts;
muts.reserve(topo.normal_nodes.size());
muts.reserve(topo.normal_nodes.size() + size_t(global_request));
if (global_request) {
topology_mutation_builder builder(guard.write_timestamp());
builder.del_global_topology_request();
muts.emplace_back(builder.build());
}
for (auto& [id, rs] : topo.normal_nodes) {
if (rs.cleanup == cleanup_status::needed) {
topology_mutation_builder builder(guard.write_timestamp());
@@ -4778,6 +4786,10 @@ future<> storage_service::uninit_address_map() {
return _gossiper.unregister_(_raft_ip_address_updater);
}
bool storage_service::is_topology_coordinator_enabled() const {
return _raft_topology_change_enabled;
}
future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy) {
assert(this_shard_id() == 0);
@@ -6323,6 +6335,55 @@ future<> storage_service::do_drain() {
co_await _repair.invoke_on_all(&repair_service::shutdown);
}
future<> storage_service::do_cluster_cleanup() {
auto& raft_server = _group0->group0_server();
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
auto curr_req = _topology_state_machine._topology.global_request;
if (curr_req && *curr_req != global_topology_request::cleanup) {
// FIXME: replace this with a queue
throw std::runtime_error{
"topology coordinator: cluster cleanup: a different topology request is already pending, try again later"};
}
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id()));
}
const auto& rs = it->second;
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state));
}
slogger.info("raft topology: cluster cleanup requested");
topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::cleanup);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup: cluster cleanup requested"));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
} catch (group0_concurrent_modification&) {
slogger.info("raft topology: cleanup: concurrent operation is detected, retrying.");
continue;
}
break;
}
// Wait cleanup finishes on all nodes
co_await _topology_state_machine.event.when([this] {
return std::all_of(_topology_state_machine._topology.normal_nodes.begin(), _topology_state_machine._topology.normal_nodes.end(), [] (auto& n) {
return n.second.cleanup == cleanup_status::clean;
});
});
slogger.info("raft topology: cluster cleanup done");
}
future<> storage_service::raft_rebuild(sstring source_dc) {
auto& raft_server = _group0->group0_server();

View File

@@ -339,6 +339,7 @@ public:
future<> init_address_map(raft_address_map& address_map);
future<> uninit_address_map();
bool is_topology_coordinator_enabled() const;
future<> drain_on_shutdown();
@@ -780,6 +781,7 @@ public:
// Public for `reload_raft_topology_state` REST API.
future<> topology_transition();
future<> do_cluster_cleanup();
public:
future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst);
future<> set_tablet_balancing_enabled(bool);

View File

@@ -177,6 +177,7 @@ topology_request topology_request_from_string(const sstring& s) {
static std::unordered_map<global_topology_request, sstring> global_topology_request_to_name_map = {
{global_topology_request::new_cdc_generation, "new_cdc_generation"},
{global_topology_request::cleanup, "cleanup"},
};
std::ostream& operator<<(std::ostream& os, const global_topology_request& req) {

View File

@@ -78,6 +78,7 @@ using request_param = std::variant<join_param, rebuild_param, removenode_param,
enum class global_topology_request: uint16_t {
new_cdc_generation,
cleanup,
};
struct ring_slice {

View File

@@ -10,12 +10,7 @@ import utils
def test_cleanup(nodetool):
nodetool("cleanup", expected_requests=[
expected_request("GET", "/storage_service/keyspaces", params={"type": "non_local_strategy"},
response=["ks1", "ks2"]),
expected_request("GET", "/storage_service/keyspaces", multiple=expected_request.ANY,
response=["ks1", "ks2", "system"]),
expected_request("POST", "/storage_service/keyspace_cleanup/ks1", response=0),
expected_request("POST", "/storage_service/keyspace_cleanup/ks2", response=0),
expected_request("POST", "/storage_service/cleanup_all", response=0),
])

View File

@@ -173,8 +173,14 @@ void cleanup_operation(scylla_rest_client& client, const bpo::variables_map& vm)
}
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace), std::move(params));
} else {
for (const auto& keyspace : get_keyspaces(client, "non_local_strategy")) {
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace));
auto res = client.post(format("/storage_service/cleanup_all"));
if (res.IsObject()) {
// If previous post returns an object instead of a simple value it means the request
// failed either because server does not support it yet or topology coordinator is disabled.
// Fall back to the old cleanup API.
for (const auto& keyspace : get_keyspaces(client, "non_local_strategy")) {
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace));
}
}
}
}