diff --git a/main.cc b/main.cc index 0f8cb2dba4..f344d6b84d 100644 --- a/main.cc +++ b/main.cc @@ -540,7 +540,6 @@ int main(int ac, char** av) { api::set_server_storage_service(ctx).get(); api::set_server_gossip(ctx).get(); api::set_server_snitch(ctx).get(); - api::set_server_messaging_service(ctx).get(); api::set_server_storage_proxy(ctx).get(); api::set_server_load_sstable(ctx).get(); supervisor_notify("initializing migration manager RPC verbs"); @@ -566,6 +565,7 @@ int main(int ac, char** av) { supervisor_notify("starting storage service", true); auto& ss = service::get_local_storage_service(); ss.init_server().get(); + api::set_server_messaging_service(ctx).get(); api::set_server_storage_service(ctx).get(); supervisor_notify("starting batchlog manager"); db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) { diff --git a/message/messaging_service.cc b/message/messaging_service.cc index eb5945c8f9..c97c2ba03b 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -176,9 +176,11 @@ void messaging_service::foreach_client(std::function&& f) const { - _server->foreach_connection([f](const rpc_protocol::server::connection& c) { - f(c.info(), c.get_stats()); - }); + if (_server) { + _server->foreach_connection([f](const rpc_protocol::server::connection& c) { + f(c.info(), c.get_stats()); + }); + } } void messaging_service::increment_dropped_messages(messaging_verb verb) { @@ -298,16 +300,31 @@ gms::inet_address messaging_service::listen_address() { return _listen_address; } +future<> messaging_service::stop_tls_server() { + if (_server_tls) { + return _server_tls->stop(); + } + return make_ready_future<>(); +} + +future<> messaging_service::stop_nontls_server() { + if (_server) { + return _server->stop(); + } + return make_ready_future<>(); +} + +future<> messaging_service::stop_client() { + return parallel_for_each(_clients, [] (auto& m) { + return parallel_for_each(m, [] (std::pair& c) { + return c.second.rpc_client->stop(); + }); + }); +} + future<> messaging_service::stop() { _stopping = true; - return when_all( - _server->stop(), - parallel_for_each(_clients, [] (auto& m) { - return parallel_for_each(m, [] (std::pair& c) { - return c.second.rpc_client->stop(); - }); - }) - ).discard_result(); + return when_all(stop_nontls_server(), stop_tls_server(), stop_client()).discard_result(); } rpc::no_wait_type messaging_service::no_wait() { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index eab4052717..5b19286bc1 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -194,6 +194,9 @@ public: void start_listen(); uint16_t port(); gms::inet_address listen_address(); + future<> stop_tls_server(); + future<> stop_nontls_server(); + future<> stop_client(); future<> stop(); static rpc::no_wait_type no_wait(); bool is_stopping() { return _stopping; }