From d78bc60a74653d869a39c7c68a6b1177e9e2372f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 14 Jun 2023 09:21:20 +0300 Subject: [PATCH 1/7] Update seastar submodule * seastar 09063faa...8d7cc312 (1): > rpc: Introduce server::shutdown() Signed-off-by: Pavel Emelyanov --- seastar | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seastar b/seastar index 09063faa42..8d7cc3129d 160000 --- a/seastar +++ b/seastar @@ -1 +1 @@ -Subproject commit 09063faa42b6e0c3923348804e994dfa77479b41 +Subproject commit 8d7cc3129d5e51fc7ee9cb053d70c1b26f4b299b From bdafe2b98cad93abd1692b33dd62c5fff4800625 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 12 Dec 2022 15:42:12 +0100 Subject: [PATCH 2/7] messaging: Shutdown on stop() if it wasn't shut down earlier All rpc::client objects have to be stopped before they are destroyed. Currently this is done in messaging_service::shutdown(). The cql_test_env does not call shutdown() currently. This can lead to use-after-free on the rpc::client object, manifesting like this: Segmentation fault on shard 0. Backtrace: column_mapping::~column_mapping() at schema.cc:? db::cql_table_large_data_handler::internal_record_large_cells(sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long) const at ./db/large_data_handler.cc:180 operator() at ./db/large_data_handler.cc:123 (inlined by) seastar::future std::__invoke_impl, db::cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service&, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value)::$_1&, sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long>(std::__invoke_other, db::cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service&, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value)::$_1&, sstables::sstable const&, sstables::key const&, clustering_key_prefix const*&&, column_definition const&, unsigned long&&, unsigned long&&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/invoke.h:61 (inlined by) std::enable_if, db::cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service&, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value)::$_1&, sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long>, seastar::future >::type std::__invoke_r, db::cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service&, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value)::$_1&, sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long>(db::cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service&, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value)::$_1&, sstables::sstable const&, sstables::key const&, clustering_key_prefix const*&&, column_definition const&, unsigned long&&, unsigned long&&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/invoke.h:114 (inlined by) std::_Function_handler (sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long), db::cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service&, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value, utils::updateable_value)::$_1>::_M_invoke(std::_Any_data const&, sstables::sstable const&, sstables::key const&, clustering_key_prefix const*&&, column_definition const&, unsigned long&&, unsigned long&&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/std_function.h:290 std::function (sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long)>::operator()(sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long) const at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/std_function.h:591 (inlined by) db::cql_table_large_data_handler::record_large_cells(sstables::sstable const&, sstables::key const&, clustering_key_prefix const*, column_definition const&, unsigned long, unsigned long) const at ./db/large_data_handler.cc:175 seastar::rpc::log_exception(seastar::rpc::connection&, seastar::log_level, char const*, std::__exception_ptr::exception_ptr) at ./build/release/seastar/./seastar/src/rpc/rpc.cc:109 operator() at ./build/release/seastar/./seastar/src/rpc/rpc.cc:788 operator() at ./build/release/seastar/./seastar/include/seastar/core/future.hh:1682 (inlined by) void seastar::futurize >::satisfy_with_result_of::then_wrapped_nrvo, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14>(seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14&&)::{lambda(seastar::internal::promise_base_with_type&&, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14&, seastar::future_state&&)#1}::operator()(seastar::internal::promise_base_with_type&&, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14&, seastar::future_state&&) const::{lambda()#1}>(seastar::internal::promise_base_with_type&&, seastar::future::then_wrapped_nrvo, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14>(seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14&&)::{lambda(seastar::internal::promise_base_with_type&&, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14&, seastar::future_state&&)#1}::operator()(seastar::internal::promise_base_with_type&&, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14&, seastar::future_state&&) const::{lambda()#1}&&) at ./build/release/seastar/./seastar/include/seastar/core/future.hh:2134 (inlined by) operator() at ./build/release/seastar/./seastar/include/seastar/core/future.hh:1681 (inlined by) seastar::continuation, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14, seastar::future::then_wrapped_nrvo, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14>(seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14&&)::{lambda(seastar::internal::promise_base_with_type&&, seastar::rpc::client::client(seastar::rpc::logger const&, void*, seastar::rpc::client_options, seastar::socket, seastar::socket_address const&, seastar::socket_address const&)::$_14&, seastar::future_state&&)#1}, void>::run_and_dispose() at ./build/release/seastar/./seastar/include/seastar/core/future.hh:781 seastar::reactor::run_tasks(seastar::reactor::task_queue&) at ./build/release/seastar/./seastar/src/core/reactor.cc:2319 (inlined by) seastar::reactor::run_some_tasks() at ./build/release/seastar/./seastar/src/core/reactor.cc:2756 seastar::reactor::do_run() at ./build/release/seastar/./seastar/src/core/reactor.cc:2925 seastar::reactor::run() at ./build/release/seastar/./seastar/src/core/reactor.cc:2808 seastar::app_template::run_deprecated(int, char**, std::function&&) at ./build/release/seastar/./seastar/src/core/app-template.cc:265 seastar::app_template::run(int, char**, std::function ()>&&) at ./build/release/seastar/./seastar/src/core/app-template.cc:156 operator() at ./build/release/seastar/./seastar/src/testing/test_runner.cc:75 (inlined by) void std::__invoke_impl(std::__invoke_other, seastar::testing::test_runner::start_thread(int, char**)::$_0&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/invoke.h:61 (inlined by) std::enable_if, void>::type std::__invoke_r(seastar::testing::test_runner::start_thread(int, char**)::$_0&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/invoke.h:111 (inlined by) std::_Function_handler::_M_invoke(std::_Any_data const&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/std_function.h:290 std::function::operator()() const at /usr/bin/../lib/gcc/x86_64-redhat-linux/12/../../../../include/c++/12/bits/std_function.h:591 (inlined by) seastar::posix_thread::start_routine(void*) at ./build/release/seastar/./seastar/src/core/posix.cc:73 Fix by making sure that shutdown() is called prior to destruction. Fixes #12244 Closes #12276 --- message/messaging_service.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 2a2006b998..870b65b582 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -427,6 +427,11 @@ future<> messaging_service::shutdown() { } future<> messaging_service::stop() { + if (!_shutting_down) { + return shutdown().then([this] { + return stop(); + }); + } return unregister_handler(messaging_verb::CLIENT_ID).then([this] { if (_rpc->has_handlers()) { mlogger.error("RPC server still has handlers registered"); From fabc7df720394c1cb412753e1028659defa6022b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 25 May 2023 13:51:23 +0300 Subject: [PATCH 3/7] messaging_service: Coroutinize stop_servers() Signed-off-by: Pavel Emelyanov --- message/messaging_service.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 870b65b582..016eca0156 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "message/messaging_service.hh" #include @@ -391,7 +392,7 @@ gms::inet_address messaging_service::listen_address() { } static future<> stop_servers(std::array, 2>& servers) { - return parallel_for_each( + co_await coroutine::parallel_for_each( servers | boost::adaptors::filtered([] (auto& ptr) { return bool(ptr); }) | boost::adaptors::indirected, std::mem_fn(&messaging_service::rpc_protocol_server_wrapper::stop)); } From 8877f0b28a6391232d84a8c86fb9084da9be3d77 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 25 May 2023 14:26:06 +0300 Subject: [PATCH 4/7] messaging_service: Coroutinize stop() Signed-off-by: Pavel Emelyanov --- message/messaging_service.cc | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 016eca0156..4b7cdfdf94 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -429,11 +429,9 @@ future<> messaging_service::shutdown() { future<> messaging_service::stop() { if (!_shutting_down) { - return shutdown().then([this] { - return stop(); - }); + co_await shutdown(); } - return unregister_handler(messaging_verb::CLIENT_ID).then([this] { + co_await unregister_handler(messaging_verb::CLIENT_ID); if (_rpc->has_handlers()) { mlogger.error("RPC server still has handlers registered"); for (auto verb = messaging_verb::MUTATION; verb < messaging_verb::LAST; @@ -445,9 +443,6 @@ future<> messaging_service::stop() { std::abort(); } - - return make_ready_future<>(); - }); } rpc::no_wait_type messaging_service::no_wait() { From b27c5567faab1a123f4b0f20e1d343b47e6ab55f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 25 May 2023 14:26:17 +0300 Subject: [PATCH 5/7] messaging_service: Restore indentation after previous patch Signed-off-by: Pavel Emelyanov --- message/messaging_service.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 4b7cdfdf94..aecfc56ae3 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -432,17 +432,17 @@ future<> messaging_service::stop() { co_await shutdown(); } co_await unregister_handler(messaging_verb::CLIENT_ID); - if (_rpc->has_handlers()) { - mlogger.error("RPC server still has handlers registered"); - for (auto verb = messaging_verb::MUTATION; verb < messaging_verb::LAST; - verb = messaging_verb(int(verb) + 1)) { - if (_rpc->has_handler(verb)) { - mlogger.error(" - {}", static_cast(verb)); - } + if (_rpc->has_handlers()) { + mlogger.error("RPC server still has handlers registered"); + for (auto verb = messaging_verb::MUTATION; verb < messaging_verb::LAST; + verb = messaging_verb(int(verb) + 1)) { + if (_rpc->has_handler(verb)) { + mlogger.error(" - {}", static_cast(verb)); } - - std::abort(); } + + std::abort(); + } } rpc::no_wait_type messaging_service::no_wait() { From 4075daf96d450c12054c4f2ed26ecdac6197af43 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 25 May 2023 13:57:28 +0300 Subject: [PATCH 6/7] messaging_service: Generalize stop_servers() Make it do_with_servers() and make it accept method to call and message to print. This gives the ability to reuse this helper in next patch Signed-off-by: Pavel Emelyanov --- message/messaging_service.cc | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index aecfc56ae3..b29e1c8514 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -391,24 +391,20 @@ gms::inet_address messaging_service::listen_address() { return _cfg.ip; } -static future<> stop_servers(std::array, 2>& servers) { +static future<> do_with_servers(std::string_view what, std::array, 2>& servers, auto method) { + mlogger.info("{} server", what); co_await coroutine::parallel_for_each( servers | boost::adaptors::filtered([] (auto& ptr) { return bool(ptr); }) | boost::adaptors::indirected, - std::mem_fn(&messaging_service::rpc_protocol_server_wrapper::stop)); + method); + mlogger.info("{} server - Done", what); } future<> messaging_service::stop_tls_server() { - mlogger.info("Stopping tls server"); - return stop_servers(_server_tls).then( [] { - mlogger.info("Stopping tls server - Done"); - }); + return do_with_servers("Stopping tls", _server_tls, std::mem_fn(&rpc_protocol_server_wrapper::stop)); } future<> messaging_service::stop_nontls_server() { - mlogger.info("Stopping nontls server"); - return stop_servers(_server).then([] { - mlogger.info("Stopping nontls server - Done"); - }); + return do_with_servers("Stopping nontls", _server, std::mem_fn(&rpc_protocol_server_wrapper::stop)); } future<> messaging_service::stop_client() { From 87531915d93d32f201fc77d320b072e42774fe31 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 25 May 2023 14:29:35 +0300 Subject: [PATCH 7/7] messaging_service: Shutdown rpc server on shutdown The RPC server now has a lighter .shutdown() method that just does what m.s. shutdown() needs, so call it. On stop call regular stop to finalize the stopping process backport: The messaging_service::shutdown() had conflict due to missing e147681d851 commit Signed-off-by: Pavel Emelyanov --- message/messaging_service.cc | 11 ++++++++++- message/messaging_service.hh | 2 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index b29e1c8514..7cbcc210cc 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -399,6 +399,14 @@ static future<> do_with_servers(std::string_view what, std::array messaging_service::shutdown_tls_server() { + return do_with_servers("Shutting down tls", _server_tls, std::mem_fn(&rpc_protocol_server_wrapper::shutdown)); +} + +future<> messaging_service::shutdown_nontls_server() { + return do_with_servers("Shutting down nontls", _server, std::mem_fn(&rpc_protocol_server_wrapper::shutdown)); +} + future<> messaging_service::stop_tls_server() { return do_with_servers("Stopping tls", _server_tls, std::mem_fn(&rpc_protocol_server_wrapper::stop)); } @@ -420,13 +428,14 @@ future<> messaging_service::stop_client() { future<> messaging_service::shutdown() { _shutting_down = true; - return when_all(stop_nontls_server(), stop_tls_server(), stop_client()).discard_result(); + return when_all(shutdown_nontls_server(), shutdown_tls_server(), stop_client()).discard_result(); } future<> messaging_service::stop() { if (!_shutting_down) { co_await shutdown(); } + co_await when_all(stop_nontls_server(), stop_tls_server()); co_await unregister_handler(messaging_verb::CLIENT_ID); if (_rpc->has_handlers()) { mlogger.error("RPC server still has handlers registered"); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index a8203cf369..c32530cc3f 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -310,6 +310,8 @@ private: std::vector _scheduling_info_for_connection_index; std::vector _connection_index_for_tenant; + future<> shutdown_tls_server(); + future<> shutdown_nontls_server(); future<> stop_tls_server(); future<> stop_nontls_server(); future<> stop_client();