Merge tag 'ms_update/v2' from seastar-dev.git

From Asias:

In f27e5d2a6 (messaging_service: Delay listening ms during boot up),
messaging_service startup is splitted into two stages. Adjust the api
registration code and fix up the messaging_service stop code.
This commit is contained in:
Tomasz Grabiec
2016-06-08 10:25:14 +02:00
3 changed files with 32 additions and 12 deletions

View File

@@ -540,7 +540,6 @@ int main(int ac, char** av) {
api::set_server_storage_service(ctx).get();
api::set_server_gossip(ctx).get();
api::set_server_snitch(ctx).get();
api::set_server_messaging_service(ctx).get();
api::set_server_storage_proxy(ctx).get();
api::set_server_load_sstable(ctx).get();
supervisor_notify("initializing migration manager RPC verbs");
@@ -566,6 +565,7 @@ int main(int ac, char** av) {
supervisor_notify("starting storage service", true);
auto& ss = service::get_local_storage_service();
ss.init_server().get();
api::set_server_messaging_service(ctx).get();
api::set_server_storage_service(ctx).get();
supervisor_notify("starting batchlog manager");
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {

View File

@@ -176,9 +176,11 @@ void messaging_service::foreach_client(std::function<void(const msg_addr& id, co
}
void messaging_service::foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const {
_server->foreach_connection([f](const rpc_protocol::server::connection& c) {
f(c.info(), c.get_stats());
});
if (_server) {
_server->foreach_connection([f](const rpc_protocol::server::connection& c) {
f(c.info(), c.get_stats());
});
}
}
void messaging_service::increment_dropped_messages(messaging_verb verb) {
@@ -298,16 +300,31 @@ gms::inet_address messaging_service::listen_address() {
return _listen_address;
}
future<> messaging_service::stop_tls_server() {
if (_server_tls) {
return _server_tls->stop();
}
return make_ready_future<>();
}
future<> messaging_service::stop_nontls_server() {
if (_server) {
return _server->stop();
}
return make_ready_future<>();
}
future<> messaging_service::stop_client() {
return parallel_for_each(_clients, [] (auto& m) {
return parallel_for_each(m, [] (std::pair<const msg_addr, shard_info>& c) {
return c.second.rpc_client->stop();
});
});
}
future<> messaging_service::stop() {
_stopping = true;
return when_all(
_server->stop(),
parallel_for_each(_clients, [] (auto& m) {
return parallel_for_each(m, [] (std::pair<const msg_addr, shard_info>& c) {
return c.second.rpc_client->stop();
});
})
).discard_result();
return when_all(stop_nontls_server(), stop_tls_server(), stop_client()).discard_result();
}
rpc::no_wait_type messaging_service::no_wait() {

View File

@@ -194,6 +194,9 @@ public:
void start_listen();
uint16_t port();
gms::inet_address listen_address();
future<> stop_tls_server();
future<> stop_nontls_server();
future<> stop_client();
future<> stop();
static rpc::no_wait_type no_wait();
bool is_stopping() { return _stopping; }