Backport 'Merge 'Enlighten messaging_service::shutdown()''
This includes seastar update titled 'Merge 'Split rpc::server stop into two parts'' Includes backport of #12244 fix * br-5.1-backport-ms-shutdown: messaging_service: Shutdown rpc server on shutdown messaging_service: Generalize stop_servers() messaging_service: Restore indentation after previous patch messaging_service: Coroutinize stop() messaging_service: Coroutinize stop_servers() messaging: Shutdown on stop() if it wasn't shut down earlier Update seastar submodule refs: #14031
This commit is contained in:
@@ -9,6 +9,7 @@
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
|
||||
#include "message/messaging_service.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
@@ -390,24 +391,28 @@ gms::inet_address messaging_service::listen_address() {
|
||||
return _cfg.ip;
|
||||
}
|
||||
|
||||
static future<> stop_servers(std::array<std::unique_ptr<messaging_service::rpc_protocol_server_wrapper>, 2>& servers) {
|
||||
return parallel_for_each(
|
||||
static future<> do_with_servers(std::string_view what, std::array<std::unique_ptr<messaging_service::rpc_protocol_server_wrapper>, 2>& servers, auto method) {
|
||||
mlogger.info("{} server", what);
|
||||
co_await coroutine::parallel_for_each(
|
||||
servers | boost::adaptors::filtered([] (auto& ptr) { return bool(ptr); }) | boost::adaptors::indirected,
|
||||
std::mem_fn(&messaging_service::rpc_protocol_server_wrapper::stop));
|
||||
method);
|
||||
mlogger.info("{} server - Done", what);
|
||||
}
|
||||
|
||||
future<> messaging_service::shutdown_tls_server() {
|
||||
return do_with_servers("Shutting down tls", _server_tls, std::mem_fn(&rpc_protocol_server_wrapper::shutdown));
|
||||
}
|
||||
|
||||
future<> messaging_service::shutdown_nontls_server() {
|
||||
return do_with_servers("Shutting down nontls", _server, std::mem_fn(&rpc_protocol_server_wrapper::shutdown));
|
||||
}
|
||||
|
||||
future<> messaging_service::stop_tls_server() {
|
||||
mlogger.info("Stopping tls server");
|
||||
return stop_servers(_server_tls).then( [] {
|
||||
mlogger.info("Stopping tls server - Done");
|
||||
});
|
||||
return do_with_servers("Stopping tls", _server_tls, std::mem_fn(&rpc_protocol_server_wrapper::stop));
|
||||
}
|
||||
|
||||
future<> messaging_service::stop_nontls_server() {
|
||||
mlogger.info("Stopping nontls server");
|
||||
return stop_servers(_server).then([] {
|
||||
mlogger.info("Stopping nontls server - Done");
|
||||
});
|
||||
return do_with_servers("Stopping nontls", _server, std::mem_fn(&rpc_protocol_server_wrapper::stop));
|
||||
}
|
||||
|
||||
future<> messaging_service::stop_client() {
|
||||
@@ -423,25 +428,26 @@ future<> messaging_service::stop_client() {
|
||||
|
||||
future<> messaging_service::shutdown() {
|
||||
_shutting_down = true;
|
||||
return when_all(stop_nontls_server(), stop_tls_server(), stop_client()).discard_result();
|
||||
return when_all(shutdown_nontls_server(), shutdown_tls_server(), stop_client()).discard_result();
|
||||
}
|
||||
|
||||
future<> messaging_service::stop() {
|
||||
return unregister_handler(messaging_verb::CLIENT_ID).then([this] {
|
||||
if (_rpc->has_handlers()) {
|
||||
mlogger.error("RPC server still has handlers registered");
|
||||
for (auto verb = messaging_verb::MUTATION; verb < messaging_verb::LAST;
|
||||
verb = messaging_verb(int(verb) + 1)) {
|
||||
if (_rpc->has_handler(verb)) {
|
||||
mlogger.error(" - {}", static_cast<int>(verb));
|
||||
}
|
||||
if (!_shutting_down) {
|
||||
co_await shutdown();
|
||||
}
|
||||
co_await when_all(stop_nontls_server(), stop_tls_server());
|
||||
co_await unregister_handler(messaging_verb::CLIENT_ID);
|
||||
if (_rpc->has_handlers()) {
|
||||
mlogger.error("RPC server still has handlers registered");
|
||||
for (auto verb = messaging_verb::MUTATION; verb < messaging_verb::LAST;
|
||||
verb = messaging_verb(int(verb) + 1)) {
|
||||
if (_rpc->has_handler(verb)) {
|
||||
mlogger.error(" - {}", static_cast<int>(verb));
|
||||
}
|
||||
|
||||
std::abort();
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
|
||||
rpc::no_wait_type messaging_service::no_wait() {
|
||||
|
||||
@@ -310,6 +310,8 @@ private:
|
||||
std::vector<scheduling_info_for_connection_index> _scheduling_info_for_connection_index;
|
||||
std::vector<tenant_connection_index> _connection_index_for_tenant;
|
||||
|
||||
future<> shutdown_tls_server();
|
||||
future<> shutdown_nontls_server();
|
||||
future<> stop_tls_server();
|
||||
future<> stop_nontls_server();
|
||||
future<> stop_client();
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 09063faa42...8d7cc3129d
Reference in New Issue
Block a user