diff --git a/service/storage_service.cc b/service/storage_service.cc index fc30bbc638..978c33ab95 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1960,6 +1960,7 @@ future<> storage_service::remove_node(sstring host_id_string) { } } } + logger.info("remove_node: endpoint = {}, replicating_nodes = {}", endpoint, ss._replicating_nodes); ss._removing_node = endpoint; tm.add_leaving_endpoint(endpoint); ss.update_pending_ranges().get(); @@ -1969,13 +1970,23 @@ future<> storage_service::remove_node(sstring host_id_string) { gossiper.advertise_removing(endpoint, host_id, local_host_id).get(); // kick off streaming commands - ss.restore_replica_count(endpoint, my_address).get(); + // No need to wait for restore_replica_count to complete, since + // when it completes, the node will be removed from _replicating_nodes, + // and we wait for _replicating_nodes to become empty below + ss.restore_replica_count(endpoint, my_address).handle_exception([endpoint, my_address] (auto ep) { + logger.info("Failed to restore_replica_count for node {} on node {}", endpoint, my_address); + }); // wait for ReplicationFinishedVerbHandler to signal we're done - while (!ss._replicating_nodes.empty()) { + while (!(ss._replicating_nodes.empty() || ss._force_remove_completion)) { sleep(std::chrono::milliseconds(100)).get(); } + if (ss._force_remove_completion) { + ss._force_remove_completion = false; + throw std::runtime_error("nodetool removenode force is called by user"); + } + std::unordered_set tmp(tokens.begin(), tokens.end()); ss.excise(std::move(tmp), endpoint); @@ -2995,5 +3006,54 @@ future storage_service::get_removal_status() { }); } +future<> storage_service::force_remove_completion() { + return run_with_no_api_lock([] (storage_service& ss) { + return seastar::async([&ss] { + if (!ss._operation_in_progress.empty()) { + if (ss._operation_in_progress != sstring("remove_node")) { + throw std::runtime_error(sprint("Operation %s is in progress, try again", ss._operation_in_progress)); + } else { + // This flag will make remove_node stop waiting for the confirmation + ss._force_remove_completion = true; + while (!ss._operation_in_progress.empty()) { + // Wait removenode operation to complete + logger.info("Operation {} is in progress, wait for it to complete", ss._operation_in_progress); + sleep(std::chrono::seconds(1)).get(); + } + ss._force_remove_completion = false; + } + } + ss._operation_in_progress = sstring("removenode_force"); + try { + if (!ss._replicating_nodes.empty() || !ss._token_metadata.get_leaving_endpoints().empty()) { + auto leaving = ss._token_metadata.get_leaving_endpoints(); + logger.warn("Removal not confirmed for {}, Leaving={}", join(",", ss._replicating_nodes), leaving); + for (auto endpoint : leaving) { + utils::UUID host_id; + auto tokens = ss._token_metadata.get_tokens(endpoint); + try { + host_id = ss._token_metadata.get_host_id(endpoint); + } catch (...) { + logger.warn("No host_id is found for endpoint {}", endpoint); + continue; + } + gms::get_local_gossiper().advertise_token_removed(endpoint, host_id).get(); + std::unordered_set tokens_set(tokens.begin(), tokens.end()); + ss.excise(tokens_set, endpoint); + } + ss._replicating_nodes.clear(); + ss._removing_node = std::experimental::nullopt; + } else { + logger.warn("No tokens to force removal on, call 'removenode' first"); + } + ss._operation_in_progress = {}; + } catch (...) { + ss._operation_in_progress = {}; + throw; + } + }); + }); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index d865499627..4b5fa0a451 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -123,6 +123,7 @@ private: shared_ptr> _cql_server; shared_ptr> _thrift_server; sstring _operation_in_progress; + bool _force_remove_completion = false; bool _ms_stopped = false; public: storage_service(distributed& db) @@ -1930,33 +1931,14 @@ public: * Get the status of a token removal. */ future get_removal_status(); -#if 0 /** * Force a remove operation to complete. This may be necessary if a remove operation * blocks forever due to node/stream failure. removeToken() must be called * first, this is a last resort measure. No further attempt will be made to restore replicas. */ - public void forceRemoveCompletion() - { - if (!replicatingNodes.isEmpty() || !_token_metadata.getLeavingEndpoints().isEmpty()) - { - logger.warn("Removal not confirmed for for {}", StringUtils.join(this.replicatingNodes, ",")); - for (InetAddress endpoint : _token_metadata.getLeavingEndpoints()) - { - UUID hostId = _token_metadata.getHostId(endpoint); - Gossiper.instance.advertiseTokenRemoved(endpoint, hostId); - excise(_token_metadata.getTokens(endpoint), endpoint); - } - replicatingNodes.clear(); - removingNode = null; - } - else - { - logger.warn("No tokens to force removal on, call 'removenode' first"); - } - } -#endif + future<> force_remove_completion(); + public: /** * Remove a node that has died, attempting to restore the replica count.