storage_service: Add force_remove_completion

It is needed by the

$ nodetool removenode force

command.
This commit is contained in:
Asias He
2016-04-11 10:46:12 +08:00
parent 7c7e5967f6
commit 9ffb95216d
2 changed files with 65 additions and 23 deletions

View File

@@ -1960,6 +1960,7 @@ future<> storage_service::remove_node(sstring host_id_string) {
}
}
}
logger.info("remove_node: endpoint = {}, replicating_nodes = {}", endpoint, ss._replicating_nodes);
ss._removing_node = endpoint;
tm.add_leaving_endpoint(endpoint);
ss.update_pending_ranges().get();
@@ -1969,13 +1970,23 @@ future<> storage_service::remove_node(sstring host_id_string) {
gossiper.advertise_removing(endpoint, host_id, local_host_id).get();
// kick off streaming commands
ss.restore_replica_count(endpoint, my_address).get();
// No need to wait for restore_replica_count to complete, since
// when it completes, the node will be removed from _replicating_nodes,
// and we wait for _replicating_nodes to become empty below
ss.restore_replica_count(endpoint, my_address).handle_exception([endpoint, my_address] (auto ep) {
logger.info("Failed to restore_replica_count for node {} on node {}", endpoint, my_address);
});
// wait for ReplicationFinishedVerbHandler to signal we're done
while (!ss._replicating_nodes.empty()) {
while (!(ss._replicating_nodes.empty() || ss._force_remove_completion)) {
sleep(std::chrono::milliseconds(100)).get();
}
if (ss._force_remove_completion) {
ss._force_remove_completion = false;
throw std::runtime_error("nodetool removenode force is called by user");
}
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
ss.excise(std::move(tmp), endpoint);
@@ -2995,5 +3006,54 @@ future<sstring> storage_service::get_removal_status() {
});
}
future<> storage_service::force_remove_completion() {
return run_with_no_api_lock([] (storage_service& ss) {
return seastar::async([&ss] {
if (!ss._operation_in_progress.empty()) {
if (ss._operation_in_progress != sstring("remove_node")) {
throw std::runtime_error(sprint("Operation %s is in progress, try again", ss._operation_in_progress));
} else {
// This flag will make remove_node stop waiting for the confirmation
ss._force_remove_completion = true;
while (!ss._operation_in_progress.empty()) {
// Wait removenode operation to complete
logger.info("Operation {} is in progress, wait for it to complete", ss._operation_in_progress);
sleep(std::chrono::seconds(1)).get();
}
ss._force_remove_completion = false;
}
}
ss._operation_in_progress = sstring("removenode_force");
try {
if (!ss._replicating_nodes.empty() || !ss._token_metadata.get_leaving_endpoints().empty()) {
auto leaving = ss._token_metadata.get_leaving_endpoints();
logger.warn("Removal not confirmed for {}, Leaving={}", join(",", ss._replicating_nodes), leaving);
for (auto endpoint : leaving) {
utils::UUID host_id;
auto tokens = ss._token_metadata.get_tokens(endpoint);
try {
host_id = ss._token_metadata.get_host_id(endpoint);
} catch (...) {
logger.warn("No host_id is found for endpoint {}", endpoint);
continue;
}
gms::get_local_gossiper().advertise_token_removed(endpoint, host_id).get();
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
ss.excise(tokens_set, endpoint);
}
ss._replicating_nodes.clear();
ss._removing_node = std::experimental::nullopt;
} else {
logger.warn("No tokens to force removal on, call 'removenode' first");
}
ss._operation_in_progress = {};
} catch (...) {
ss._operation_in_progress = {};
throw;
}
});
});
}
} // namespace service

View File

@@ -123,6 +123,7 @@ private:
shared_ptr<distributed<transport::cql_server>> _cql_server;
shared_ptr<distributed<thrift_server>> _thrift_server;
sstring _operation_in_progress;
bool _force_remove_completion = false;
bool _ms_stopped = false;
public:
storage_service(distributed<database>& db)
@@ -1930,33 +1931,14 @@ public:
* Get the status of a token removal.
*/
future<sstring> get_removal_status();
#if 0
/**
* Force a remove operation to complete. This may be necessary if a remove operation
* blocks forever due to node/stream failure. removeToken() must be called
* first, this is a last resort measure. No further attempt will be made to restore replicas.
*/
public void forceRemoveCompletion()
{
if (!replicatingNodes.isEmpty() || !_token_metadata.getLeavingEndpoints().isEmpty())
{
logger.warn("Removal not confirmed for for {}", StringUtils.join(this.replicatingNodes, ","));
for (InetAddress endpoint : _token_metadata.getLeavingEndpoints())
{
UUID hostId = _token_metadata.getHostId(endpoint);
Gossiper.instance.advertiseTokenRemoved(endpoint, hostId);
excise(_token_metadata.getTokens(endpoint), endpoint);
}
replicatingNodes.clear();
removingNode = null;
}
else
{
logger.warn("No tokens to force removal on, call 'removenode' first");
}
}
#endif
future<> force_remove_completion();
public:
/**
* Remove a node that has died, attempting to restore the replica count.