diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 2a2006b998..7cbcc210cc 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "message/messaging_service.hh" #include @@ -390,24 +391,28 @@ gms::inet_address messaging_service::listen_address() { return _cfg.ip; } -static future<> stop_servers(std::array, 2>& servers) { - return parallel_for_each( +static future<> do_with_servers(std::string_view what, std::array, 2>& servers, auto method) { + mlogger.info("{} server", what); + co_await coroutine::parallel_for_each( servers | boost::adaptors::filtered([] (auto& ptr) { return bool(ptr); }) | boost::adaptors::indirected, - std::mem_fn(&messaging_service::rpc_protocol_server_wrapper::stop)); + method); + mlogger.info("{} server - Done", what); +} + +future<> messaging_service::shutdown_tls_server() { + return do_with_servers("Shutting down tls", _server_tls, std::mem_fn(&rpc_protocol_server_wrapper::shutdown)); +} + +future<> messaging_service::shutdown_nontls_server() { + return do_with_servers("Shutting down nontls", _server, std::mem_fn(&rpc_protocol_server_wrapper::shutdown)); } future<> messaging_service::stop_tls_server() { - mlogger.info("Stopping tls server"); - return stop_servers(_server_tls).then( [] { - mlogger.info("Stopping tls server - Done"); - }); + return do_with_servers("Stopping tls", _server_tls, std::mem_fn(&rpc_protocol_server_wrapper::stop)); } future<> messaging_service::stop_nontls_server() { - mlogger.info("Stopping nontls server"); - return stop_servers(_server).then([] { - mlogger.info("Stopping nontls server - Done"); - }); + return do_with_servers("Stopping nontls", _server, std::mem_fn(&rpc_protocol_server_wrapper::stop)); } future<> messaging_service::stop_client() { @@ -423,25 +428,26 @@ future<> messaging_service::stop_client() { future<> messaging_service::shutdown() { _shutting_down = true; - return when_all(stop_nontls_server(), stop_tls_server(), stop_client()).discard_result(); + return when_all(shutdown_nontls_server(), shutdown_tls_server(), stop_client()).discard_result(); } future<> messaging_service::stop() { - return unregister_handler(messaging_verb::CLIENT_ID).then([this] { - if (_rpc->has_handlers()) { - mlogger.error("RPC server still has handlers registered"); - for (auto verb = messaging_verb::MUTATION; verb < messaging_verb::LAST; - verb = messaging_verb(int(verb) + 1)) { - if (_rpc->has_handler(verb)) { - mlogger.error(" - {}", static_cast(verb)); - } + if (!_shutting_down) { + co_await shutdown(); + } + co_await when_all(stop_nontls_server(), stop_tls_server()); + co_await unregister_handler(messaging_verb::CLIENT_ID); + if (_rpc->has_handlers()) { + mlogger.error("RPC server still has handlers registered"); + for (auto verb = messaging_verb::MUTATION; verb < messaging_verb::LAST; + verb = messaging_verb(int(verb) + 1)) { + if (_rpc->has_handler(verb)) { + mlogger.error(" - {}", static_cast(verb)); } - - std::abort(); } - return make_ready_future<>(); - }); + std::abort(); + } } rpc::no_wait_type messaging_service::no_wait() { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index a8203cf369..c32530cc3f 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -310,6 +310,8 @@ private: std::vector _scheduling_info_for_connection_index; std::vector _connection_index_for_tenant; + future<> shutdown_tls_server(); + future<> shutdown_nontls_server(); future<> stop_tls_server(); future<> stop_nontls_server(); future<> stop_client(); diff --git a/seastar b/seastar index 09063faa42..8d7cc3129d 160000 --- a/seastar +++ b/seastar @@ -1 +1 @@ -Subproject commit 09063faa42b6e0c3923348804e994dfa77479b41 +Subproject commit 8d7cc3129d5e51fc7ee9cb053d70c1b26f4b299b