storage_service: Implement prepare_replacement_info

Needed by replace node operation.
This commit is contained in:
Asias He
2015-10-26 16:51:46 +08:00
parent 5cdbc3701a
commit 77a87cb2b6
2 changed files with 46 additions and 40 deletions

View File

@@ -140,7 +140,7 @@ future<> storage_service::prepare_to_join() {
return make_ready_future<>();
}
std::map<gms::application_state, gms::versioned_value> app_states;
auto app_states = make_shared<std::map<gms::application_state, gms::versioned_value>>();
auto f = make_ready_future<>();
if (is_replacing() && !get_property_join_ring()) {
throw std::runtime_error("Cannot set both join_ring=false and attempt to replace a node");
@@ -155,9 +155,11 @@ future<> storage_service::prepare_to_join() {
if (!is_auto_bootstrap()) {
throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
}
_bootstrap_tokens = prepare_replacement_info();
app_states.emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens));
app_states.emplace(gms::application_state::STATUS, value_factory.hibernate(true));
f = prepare_replacement_info().then([this, app_states] (auto&& tokens) {
_bootstrap_tokens = tokens;
app_states->emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens));
app_states->emplace(gms::application_state::STATUS, value_factory.hibernate(true));
});
} else if (should_bootstrap()) {
f = check_for_endpoint_collision();
}
@@ -166,23 +168,23 @@ future<> storage_service::prepare_to_join() {
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
return f.then([app_states = std::move(app_states)] {
return f.then([] {
return db::system_keyspace::get_local_host_id();
}).then([this, app_states = std::move(app_states)] (auto local_host_id) mutable {
}).then([this, app_states] (auto local_host_id) mutable {
_token_metadata.update_host_id(local_host_id, this->get_broadcast_address());
// FIXME: DatabaseDescriptor.getBroadcastRpcAddress()
auto broadcast_rpc_address = this->get_broadcast_address();
app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
app_states->emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states->emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states->emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
app_states->emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
logger.info("Starting up server gossip");
auto& gossiper = gms::get_local_gossiper();
gossiper.register_(this);
// FIXME: SystemKeyspace.incrementAndGetGeneration()
print("Start gossiper service ...\n");
return gossiper.start(get_generation_number(), app_states).then([this] {
return gossiper.start(get_generation_number(), *app_states).then([this] {
#if SS_DEBUG
gms::get_local_gossiper().debug_show();
_token_metadata.debug_show();
@@ -1074,36 +1076,40 @@ void storage_service::remove_endpoint(inet_address endpoint) {
}).get();
}
std::unordered_set<token> storage_service::prepare_replacement_info() {
return std::unordered_set<token>();
#if 0
logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
future<std::unordered_set<token>> storage_service::prepare_replacement_info() {
if (!get_replace_address()) {
throw std::runtime_error(sprint("replace_address is empty"));
}
auto replace_address = get_replace_address().value();
logger.info("Gathering node replacement information for {}", replace_address);
// if (!MessagingService.instance().isListening())
// MessagingService.instance().listen(FBUtilities.getLocalAddress());
// make magic happen
Gossiper.instance.doShadowRound();
UUID hostId = null;
// now that we've gossiped at least once, we should be able to find the node we're replacing
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
try
{
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(get_application_state_value(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
#endif
return gms::get_local_gossiper().do_shadow_round().then([this, replace_address] {
auto& gossiper = gms::get_local_gossiper();
// now that we've gossiped at least once, we should be able to find the node we're replacing
auto state = gossiper.get_endpoint_state_for_endpoint(replace_address);
if (!state) {
throw std::runtime_error(sprint("Cannot replace_address %s because it doesn't exist in gossip", replace_address));
}
auto host_id = gossiper.get_host_id(replace_address);
auto eps = gossiper.get_endpoint_state_for_endpoint(replace_address);
if (!eps) {
throw std::runtime_error(sprint("Cannot replace_address %s because can not find gossip endpoint state", replace_address));
}
auto value = eps->get_application_state(application_state::TOKENS);
if (!value) {
throw std::runtime_error(sprint("Could not find tokens for %s to replace", replace_address));
}
auto tokens = get_tokens_for(replace_address);
// use the replacee's host Id as our own so we receive hints, etc
return db::system_keyspace::set_local_host_id(host_id).discard_result().then([replace_address, tokens = std::move(tokens)] {
gms::get_local_gossiper().reset_endpoint_state_map(); // clean up since we have what we need
return make_ready_future<std::unordered_set<token>>(std::move(tokens));
});
});
}
std::map<gms::inet_address, float> storage_service::get_ownership() const {

View File

@@ -303,7 +303,7 @@ public:
}
#endif
public:
std::unordered_set<token> prepare_replacement_info();
future<std::unordered_set<token>> prepare_replacement_info();
future<> check_for_endpoint_collision();
#if 0