storage_service: Enable more code in prepare_to_join

This commit is contained in:
Asias He
2015-08-06 09:29:42 +08:00
parent f7b3b9646f
commit efe067fd74

View File

@@ -78,81 +78,76 @@ bool storage_service::should_bootstrap() {
}
future<> storage_service::prepare_to_join() {
if (!_joined) {
#if 0
if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
if (DatabaseDescriptor.isReplacing())
{
if (SystemKeyspace.bootstrapComplete())
throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
if (!DatabaseDescriptor.isAutoBootstrap())
throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
_bootstrap_tokens = prepareReplacementInfo();
appStates.put(ApplicationState.TOKENS, valueFactory.tokens(_bootstrap_tokens));
appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
}
else if (should_bootstrap())
{
checkForEndpointCollision();
}
#endif
auto f = make_ready_future<>();
if (should_bootstrap()) {
f = check_for_endpoint_collision();
}
if (_joined) {
return make_ready_future<>();
}
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
return f.then([] {
return db::system_keyspace::get_local_host_id();
}).then([this] (auto local_host_id) {
std::map<gms::application_state, gms::versioned_value> app_states;
std::map<gms::application_state, gms::versioned_value> app_states;
auto f = make_ready_future<>();
if (is_replacing() && !get_property_join_ring()) {
throw std::runtime_error("Cannot set both join_ring=false and attempt to replace a node");
}
if (get_replace_tokens().size() > 0 || get_replace_node()) {
throw std::runtime_error("Replace method removed; use cassandra.replace_address instead");
}
if (is_replacing()) {
// if (SystemKeyspace.bootstrapComplete())
// throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
if (!is_auto_bootstrap()) {
throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
}
_bootstrap_tokens = prepare_replacement_info();
app_states.emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens));
app_states.emplace(gms::application_state::STATUS, value_factory.hibernate(true));
} else if (should_bootstrap()) {
f = check_for_endpoint_collision();
}
_token_metadata.update_host_id(local_host_id, this->get_broadcast_address());
// FIXME: DatabaseDescriptor.getBroadcastRpcAddress()
auto broadcast_rpc_address = this->get_broadcast_address();
app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
logger.info("Starting up server gossip");
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
return f.then([app_states = std::move(app_states)] {
return db::system_keyspace::get_local_host_id();
}).then([this, app_states = std::move(app_states)] (auto local_host_id) mutable {
_token_metadata.update_host_id(local_host_id, this->get_broadcast_address());
// FIXME: DatabaseDescriptor.getBroadcastRpcAddress()
auto broadcast_rpc_address = this->get_broadcast_address();
app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
logger.info("Starting up server gossip");
auto& gossiper = gms::get_local_gossiper();
gossiper.register_(this);
using namespace std::chrono;
auto now = high_resolution_clock::now().time_since_epoch();
int generation_number = duration_cast<seconds>(now).count();
// FIXME: SystemKeyspace.incrementAndGetGeneration()
print("Start gossiper service ...\n");
return gossiper.start(generation_number, app_states).then([this] {
auto& gossiper = gms::get_local_gossiper();
gossiper.register_(this);
using namespace std::chrono;
auto now = high_resolution_clock::now().time_since_epoch();
int generation_number = duration_cast<seconds>(now).count();
// FIXME: SystemKeyspace.incrementAndGetGeneration()
print("Start gossiper service ...\n");
return gossiper.start(generation_number, app_states).then([this] {
#if SS_DEBUG
gms::get_local_gossiper().debug_show();
_token_metadata.debug_show();
#endif
});
}).then([this] {
// gossip snitch infos (local DC and rack)
gossip_snitch_info();
auto& proxy = service::get_local_storage_proxy();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates
#if 0
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
gms::get_local_gossiper().debug_show();
_token_metadata.debug_show();
#endif
});
}
return make_ready_future<>();
}).then([this] {
// gossip snitch infos (local DC and rack)
gossip_snitch_info();
auto& proxy = service::get_local_storage_proxy();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates
#if 0
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
#endif
});
}
future<> storage_service::join_token_ring(int delay) {