storage_service: topology_coordinator: introduce cleanup REST API integrated with the topology coordinator
Introduce new REST API "/storage_service/cleanup_all" that, when triggered, instructs the topology coordinator to initiate cluster wide cleanup on all dirty nodes. It is done by introducing new global command "global_topology_request::cleanup".
This commit is contained in:
@@ -797,6 +797,21 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/cleanup_all",
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Trigger a global cluster cleanup trough the topology coordinator",
|
||||
"type":"long",
|
||||
"nickname":"cleanup_all",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/keyspace_offstrategy_compaction/{keyspace}",
|
||||
"operations":[
|
||||
|
||||
@@ -787,6 +787,16 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
co_return json::json_return_type(0);
|
||||
});
|
||||
|
||||
ss::cleanup_all.set(r, [&ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
|
||||
if (!ss.is_topology_coordinator_enabled()) {
|
||||
throw std::runtime_error("Cannot run cleanup through the topology coordinator because it is disabled");
|
||||
}
|
||||
return ss.do_cluster_cleanup();
|
||||
});
|
||||
co_return json::json_return_type(0);
|
||||
});
|
||||
|
||||
ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
|
||||
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
|
||||
bool res = false;
|
||||
@@ -1491,6 +1501,7 @@ void unset_storage_service(http_context& ctx, routes& r) {
|
||||
ss::force_compaction.unset(r);
|
||||
ss::force_keyspace_compaction.unset(r);
|
||||
ss::force_keyspace_cleanup.unset(r);
|
||||
ss::cleanup_all.unset(r);
|
||||
ss::perform_keyspace_offstrategy_compaction.unset(r);
|
||||
ss::upgrade_sstables.unset(r);
|
||||
ss::force_flush.unset(r);
|
||||
|
||||
@@ -1777,6 +1777,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
"insert CDC generation data (UUID: {})", gen_uuid);
|
||||
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
|
||||
}
|
||||
break;
|
||||
case global_topology_request::cleanup:
|
||||
co_await start_cleanup_on_dirty_nodes(std::move(guard), true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -2262,7 +2265,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
|
||||
if (auto* cleanup = std::get_if<start_cleanup>(&work)) {
|
||||
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard));
|
||||
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), false);
|
||||
co_return true;
|
||||
}
|
||||
|
||||
@@ -3005,11 +3008,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
return muts;
|
||||
}
|
||||
|
||||
future<> start_cleanup_on_dirty_nodes(group0_guard guard) {
|
||||
future<> start_cleanup_on_dirty_nodes(group0_guard guard, bool global_request) {
|
||||
auto& topo = _topo_sm._topology;
|
||||
std::vector<canonical_mutation> muts;
|
||||
muts.reserve(topo.normal_nodes.size());
|
||||
muts.reserve(topo.normal_nodes.size() + size_t(global_request));
|
||||
|
||||
if (global_request) {
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.del_global_topology_request();
|
||||
muts.emplace_back(builder.build());
|
||||
}
|
||||
for (auto& [id, rs] : topo.normal_nodes) {
|
||||
if (rs.cleanup == cleanup_status::needed) {
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
@@ -4778,6 +4786,10 @@ future<> storage_service::uninit_address_map() {
|
||||
return _gossiper.unregister_(_raft_ip_address_updater);
|
||||
}
|
||||
|
||||
bool storage_service::is_topology_coordinator_enabled() const {
|
||||
return _raft_topology_change_enabled;
|
||||
}
|
||||
|
||||
future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy) {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
@@ -6323,6 +6335,55 @@ future<> storage_service::do_drain() {
|
||||
co_await _repair.invoke_on_all(&repair_service::shutdown);
|
||||
}
|
||||
|
||||
future<> storage_service::do_cluster_cleanup() {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (curr_req && *curr_req != global_topology_request::cleanup) {
|
||||
// FIXME: replace this with a queue
|
||||
throw std::runtime_error{
|
||||
"topology coordinator: cluster cleanup: a different topology request is already pending, try again later"};
|
||||
}
|
||||
|
||||
|
||||
auto it = _topology_state_machine._topology.find(raft_server.id());
|
||||
if (!it) {
|
||||
throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id()));
|
||||
}
|
||||
|
||||
const auto& rs = it->second;
|
||||
|
||||
if (rs.state != node_state::normal) {
|
||||
throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state));
|
||||
}
|
||||
|
||||
slogger.info("raft topology: cluster cleanup requested");
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.set_global_topology_request(global_topology_request::cleanup);
|
||||
topology_change change{{builder.build()}};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup: cluster cleanup requested"));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: cleanup: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait cleanup finishes on all nodes
|
||||
co_await _topology_state_machine.event.when([this] {
|
||||
return std::all_of(_topology_state_machine._topology.normal_nodes.begin(), _topology_state_machine._topology.normal_nodes.end(), [] (auto& n) {
|
||||
return n.second.cleanup == cleanup_status::clean;
|
||||
});
|
||||
});
|
||||
slogger.info("raft topology: cluster cleanup done");
|
||||
}
|
||||
|
||||
future<> storage_service::raft_rebuild(sstring source_dc) {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
|
||||
|
||||
@@ -339,6 +339,7 @@ public:
|
||||
future<> init_address_map(raft_address_map& address_map);
|
||||
|
||||
future<> uninit_address_map();
|
||||
bool is_topology_coordinator_enabled() const;
|
||||
|
||||
future<> drain_on_shutdown();
|
||||
|
||||
@@ -780,6 +781,7 @@ public:
|
||||
// Public for `reload_raft_topology_state` REST API.
|
||||
future<> topology_transition();
|
||||
|
||||
future<> do_cluster_cleanup();
|
||||
public:
|
||||
future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst);
|
||||
future<> set_tablet_balancing_enabled(bool);
|
||||
|
||||
@@ -177,6 +177,7 @@ topology_request topology_request_from_string(const sstring& s) {
|
||||
|
||||
static std::unordered_map<global_topology_request, sstring> global_topology_request_to_name_map = {
|
||||
{global_topology_request::new_cdc_generation, "new_cdc_generation"},
|
||||
{global_topology_request::cleanup, "cleanup"},
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const global_topology_request& req) {
|
||||
|
||||
@@ -78,6 +78,7 @@ using request_param = std::variant<join_param, rebuild_param, removenode_param,
|
||||
|
||||
enum class global_topology_request: uint16_t {
|
||||
new_cdc_generation,
|
||||
cleanup,
|
||||
};
|
||||
|
||||
struct ring_slice {
|
||||
|
||||
@@ -10,12 +10,7 @@ import utils
|
||||
|
||||
def test_cleanup(nodetool):
|
||||
nodetool("cleanup", expected_requests=[
|
||||
expected_request("GET", "/storage_service/keyspaces", params={"type": "non_local_strategy"},
|
||||
response=["ks1", "ks2"]),
|
||||
expected_request("GET", "/storage_service/keyspaces", multiple=expected_request.ANY,
|
||||
response=["ks1", "ks2", "system"]),
|
||||
expected_request("POST", "/storage_service/keyspace_cleanup/ks1", response=0),
|
||||
expected_request("POST", "/storage_service/keyspace_cleanup/ks2", response=0),
|
||||
expected_request("POST", "/storage_service/cleanup_all", response=0),
|
||||
])
|
||||
|
||||
|
||||
|
||||
@@ -173,8 +173,14 @@ void cleanup_operation(scylla_rest_client& client, const bpo::variables_map& vm)
|
||||
}
|
||||
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace), std::move(params));
|
||||
} else {
|
||||
for (const auto& keyspace : get_keyspaces(client, "non_local_strategy")) {
|
||||
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace));
|
||||
auto res = client.post(format("/storage_service/cleanup_all"));
|
||||
if (res.IsObject()) {
|
||||
// If previous post returns an object instead of a simple value it means the request
|
||||
// failed either because server does not support it yet or topology coordinator is disabled.
|
||||
// Fall back to the old cleanup API.
|
||||
for (const auto& keyspace : get_keyspaces(client, "non_local_strategy")) {
|
||||
client.post(format("/storage_service/keyspace_cleanup/{}", keyspace));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user