From 77a87cb2b69e274d702a01d83363b18536ce5f3a Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 26 Oct 2015 16:51:46 +0800 Subject: [PATCH] storage_service: Implement prepare_replacement_info Needed by replace node operation. --- service/storage_service.cc | 84 ++++++++++++++++++++------------------ service/storage_service.hh | 2 +- 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 4734ac0962..a877969343 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -140,7 +140,7 @@ future<> storage_service::prepare_to_join() { return make_ready_future<>(); } - std::map app_states; + auto app_states = make_shared>(); auto f = make_ready_future<>(); if (is_replacing() && !get_property_join_ring()) { throw std::runtime_error("Cannot set both join_ring=false and attempt to replace a node"); @@ -155,9 +155,11 @@ future<> storage_service::prepare_to_join() { if (!is_auto_bootstrap()) { throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration"); } - _bootstrap_tokens = prepare_replacement_info(); - app_states.emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens)); - app_states.emplace(gms::application_state::STATUS, value_factory.hibernate(true)); + f = prepare_replacement_info().then([this, app_states] (auto&& tokens) { + _bootstrap_tokens = tokens; + app_states->emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens)); + app_states->emplace(gms::application_state::STATUS, value_factory.hibernate(true)); + }); } else if (should_bootstrap()) { f = check_for_endpoint_collision(); } @@ -166,23 +168,23 @@ future<> storage_service::prepare_to_join() { // for bootstrap to get the load info it needs. // (we won't be part of the storage ring though until we add a counterId to our state, below.) // Seed the host ID-to-endpoint map with our own ID. - return f.then([app_states = std::move(app_states)] { + return f.then([] { return db::system_keyspace::get_local_host_id(); - }).then([this, app_states = std::move(app_states)] (auto local_host_id) mutable { + }).then([this, app_states] (auto local_host_id) mutable { _token_metadata.update_host_id(local_host_id, this->get_broadcast_address()); // FIXME: DatabaseDescriptor.getBroadcastRpcAddress() auto broadcast_rpc_address = this->get_broadcast_address(); - app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version()); - app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id)); - app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address)); - app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version()); + app_states->emplace(gms::application_state::NET_VERSION, value_factory.network_version()); + app_states->emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id)); + app_states->emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address)); + app_states->emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version()); logger.info("Starting up server gossip"); auto& gossiper = gms::get_local_gossiper(); gossiper.register_(this); // FIXME: SystemKeyspace.incrementAndGetGeneration() print("Start gossiper service ...\n"); - return gossiper.start(get_generation_number(), app_states).then([this] { + return gossiper.start(get_generation_number(), *app_states).then([this] { #if SS_DEBUG gms::get_local_gossiper().debug_show(); _token_metadata.debug_show(); @@ -1074,36 +1076,40 @@ void storage_service::remove_endpoint(inet_address endpoint) { }).get(); } -std::unordered_set storage_service::prepare_replacement_info() { - return std::unordered_set(); -#if 0 - logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress()); - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(FBUtilities.getLocalAddress()); +future> storage_service::prepare_replacement_info() { + if (!get_replace_address()) { + throw std::runtime_error(sprint("replace_address is empty")); + } + auto replace_address = get_replace_address().value(); + logger.info("Gathering node replacement information for {}", replace_address); + + // if (!MessagingService.instance().isListening()) + // MessagingService.instance().listen(FBUtilities.getLocalAddress()); // make magic happen - Gossiper.instance.doShadowRound(); - - UUID hostId = null; - // now that we've gossiped at least once, we should be able to find the node we're replacing - if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null) - throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip"); - hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); - try - { - if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null) - throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); - Collection tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(get_application_state_value(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS)))); - - SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc - Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need - return tokens; - } - catch (IOException e) - { - throw new RuntimeException(e); - } -#endif + return gms::get_local_gossiper().do_shadow_round().then([this, replace_address] { + auto& gossiper = gms::get_local_gossiper(); + // now that we've gossiped at least once, we should be able to find the node we're replacing + auto state = gossiper.get_endpoint_state_for_endpoint(replace_address); + if (!state) { + throw std::runtime_error(sprint("Cannot replace_address %s because it doesn't exist in gossip", replace_address)); + } + auto host_id = gossiper.get_host_id(replace_address); + auto eps = gossiper.get_endpoint_state_for_endpoint(replace_address); + if (!eps) { + throw std::runtime_error(sprint("Cannot replace_address %s because can not find gossip endpoint state", replace_address)); + } + auto value = eps->get_application_state(application_state::TOKENS); + if (!value) { + throw std::runtime_error(sprint("Could not find tokens for %s to replace", replace_address)); + } + auto tokens = get_tokens_for(replace_address); + // use the replacee's host Id as our own so we receive hints, etc + return db::system_keyspace::set_local_host_id(host_id).discard_result().then([replace_address, tokens = std::move(tokens)] { + gms::get_local_gossiper().reset_endpoint_state_map(); // clean up since we have what we need + return make_ready_future>(std::move(tokens)); + }); + }); } std::map storage_service::get_ownership() const { diff --git a/service/storage_service.hh b/service/storage_service.hh index 45e7b09359..ae44e0d455 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -303,7 +303,7 @@ public: } #endif public: - std::unordered_set prepare_replacement_info(); + future> prepare_replacement_info(); future<> check_for_endpoint_collision(); #if 0