storage_service: Simplify prepare_to_join with seastar thread

This commit is contained in:
Asias He
2015-12-04 09:22:57 +08:00
parent e9a4d93d1b
commit b3dd2d976a
2 changed files with 39 additions and 44 deletions

View File

@@ -141,13 +141,13 @@ bool storage_service::should_bootstrap() {
return is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && !get_seeds().count(get_broadcast_address());
}
future<> storage_service::prepare_to_join() {
// Runs inside seastar::async context
void storage_service::prepare_to_join() {
if (_joined) {
return make_ready_future<>();
return;
}
auto app_states = make_shared<std::map<gms::application_state, gms::versioned_value>>();
auto f = make_ready_future<>();
std::map<gms::application_state, gms::versioned_value> app_states;
if (db().local().is_replacing() && !get_property_join_ring()) {
throw std::runtime_error("Cannot set both join_ring=false and attempt to replace a node");
}
@@ -161,56 +161,51 @@ future<> storage_service::prepare_to_join() {
if (!is_auto_bootstrap()) {
throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
}
f = prepare_replacement_info().then([this, app_states] (auto&& tokens) {
_bootstrap_tokens = tokens;
app_states->emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens));
app_states->emplace(gms::application_state::STATUS, value_factory.hibernate(true));
});
_bootstrap_tokens = prepare_replacement_info().get0();
app_states.emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens));
app_states.emplace(gms::application_state::STATUS, value_factory.hibernate(true));
} else if (should_bootstrap()) {
f = check_for_endpoint_collision();
check_for_endpoint_collision().get();
}
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
return f.then([] {
return db::system_keyspace::get_local_host_id();
}).then([this, app_states] (auto local_host_id) mutable {
_token_metadata.update_host_id(local_host_id, this->get_broadcast_address());
auto broadcast_rpc_address = utils::fb_utilities::get_broadcast_rpc_address();
app_states->emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states->emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states->emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
app_states->emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
logger.info("Starting up server gossip");
auto local_host_id = db::system_keyspace::get_local_host_id().get0();
_token_metadata.update_host_id(local_host_id, get_broadcast_address());
auto broadcast_rpc_address = utils::fb_utilities::get_broadcast_rpc_address();
app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
logger.info("Starting up server gossip");
auto& gossiper = gms::get_local_gossiper();
gossiper.register_(this->shared_from_this());
// FIXME: SystemKeyspace.incrementAndGetGeneration()
print("Start gossiper service ...\n");
return gossiper.start_gossiping(get_generation_number(), *app_states).then([this] {
auto& gossiper = gms::get_local_gossiper();
gossiper.register_(this->shared_from_this());
// FIXME: SystemKeyspace.incrementAndGetGeneration()
print("Start gossiper service ...\n");
gossiper.start_gossiping(get_generation_number(), app_states).then([this] {
#if SS_DEBUG
gms::get_local_gossiper().debug_show();
_token_metadata.debug_show();
gms::get_local_gossiper().debug_show();
_token_metadata.debug_show();
#endif
});
}).then([this] {
// gossip snitch infos (local DC and rack)
return gossip_snitch_info().then([this] {
auto& proxy = service::get_storage_proxy();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates
#if 0
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
}).get();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
// gossip snitch infos (local DC and rack)
gossip_snitch_info().get();
auto& proxy = service::get_storage_proxy();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
update_schema_version_and_announce(proxy).get();// Ensure we know our own actual Schema UUID in preparation for updates
#if 0
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
#endif
});
});
}
// Runs inside seastar::async context
@@ -1003,7 +998,7 @@ future<> storage_service::init_server(int delay) {
}, "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
#endif
prepare_to_join().get();
prepare_to_join();
#if 0
// Has to be called after the host id has potentially changed in prepareToJoin().
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())

View File

@@ -365,7 +365,7 @@ public:
#endif
private:
bool should_bootstrap();
future<> prepare_to_join();
void prepare_to_join();
void join_token_ring(int delay);
public:
future<> join_ring();