From a9aa10e8deefad45c5dc5b19b84eb99beb95205f Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 23 Jul 2020 12:03:49 +0300 Subject: [PATCH] Merge "Unregister RPC verbs on stop" from Pavel E " There are 5 services, that register their RPC handlers in messaging service, but quite a few of them unregister them on stop. Unregistering is somewhat critical, not just because it makes the code look clean, but also because unregistration does wait for the message processing to complete, thus avoiding use-after-free's in the handlers. In particular, several handlers call service::get_schema_for_write() which, in turn, may end up in service::maybe_sync() calling for the local migration manager instance. All those handlers' processing must be waited for before stopping the migration manager. The set brings the RPC handlers unregistration in sync with the registration part. tests: unit (dev) dtest (dev: simple_boot_shutdown, repair) start-stop by hands (dev) fixes: #6904 " * 'br-rpc-unregister-verbs' of https://github.com/xemul/scylla: main: Add missing calls to unregister RPC hanlers messaging: Add missing per-service unregistering methods messaging: Add missing handlers unregistration helpers streaming: Do not use db->invoke_on_all in vain storage_proxy: Detach rpc unregistration from stop main: Shorten call to storage_proxy::init_messaging_service (cherry picked from commit 01b838e29133b54f92f22dd0ce9b045b0d22b64f) --- main.cc | 13 ++++++++++--- message/messaging_service.cc | 25 +++++++++++++++++++++++++ message/messaging_service.hh | 8 ++++++++ repair/row_level.cc | 19 +++++++++++++++++++ repair/row_level.hh | 1 + service/storage_proxy.cc | 7 +++++-- service/storage_proxy.hh | 2 +- streaming/stream_session.cc | 19 ++++++++++++++++--- streaming/stream_session.hh | 2 ++ 9 files changed, 87 insertions(+), 9 deletions(-) diff --git a/main.cc b/main.cc index e06449a57e..1bbd3fbda3 100644 --- a/main.cc +++ b/main.cc @@ -967,12 +967,16 @@ int main(int ac, char** av) { mm.init_messaging_service(); }).get(); supervisor::notify("initializing storage proxy RPC verbs"); - proxy.invoke_on_all([] (service::storage_proxy& p) { - p.init_messaging_service(); - }).get(); + proxy.invoke_on_all(&service::storage_proxy::init_messaging_service).get(); + auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] { + proxy.invoke_on_all(&service::storage_proxy::uninit_messaging_service).get(); + }); supervisor::notify("starting streaming service"); streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator).get(); + auto stop_streaming_service = defer_verbose_shutdown("streaming service", [] { + streaming::stream_session::uninit_streaming_service().get(); + }); api::set_server_stream_manager(ctx).get(); supervisor::notify("starting hinted handoff manager"); @@ -1005,6 +1009,9 @@ int main(int ac, char** av) { rs.stop().get(); }); repair_init_messaging_service_handler(rs, sys_dist_ks, view_update_generator).get(); + auto stop_repair_messages = defer_verbose_shutdown("repair message handlers", [] { + repair_uninit_messaging_service_handler().get(); + }); supervisor::notify("starting storage service", true); auto& ss = service::get_local_storage_service(); ss.init_messaging_service_part().get(); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 7369b82a7a..4bfb641808 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -796,6 +796,10 @@ void messaging_service::register_stream_mutation_fragments(std::function messaging_service::unregister_stream_mutation_fragments() { + return unregister_handler(messaging_verb::STREAM_MUTATION_FRAGMENTS); +} + template future, rpc::source> do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr rpc_client, std::unique_ptr& rpc) { @@ -827,6 +831,9 @@ rpc::sink messaging_service::make_sink_for_repair_g void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); } +future<> messaging_service::unregister_repair_get_row_diff_with_rpc_stream() { + return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM); +} // Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM future, rpc::source> @@ -846,6 +853,9 @@ rpc::sink messaging_service::make_sink_for_repair_put_row_dif void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); } +future<> messaging_service::unregister_repair_put_row_diff_with_rpc_stream() { + return unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM); +} // Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM future, rpc::source> @@ -865,6 +875,9 @@ rpc::sink messaging_service::make_sink_for_repair_get_full void messaging_service::register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM, std::move(func)); } +future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_stream() { + return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM); +} // Send a message for verb template @@ -948,6 +961,9 @@ future messaging_service::send_prepare_message(msg_a return send_message(this, messaging_verb::PREPARE_MESSAGE, id, std::move(msg), plan_id, std::move(description), reason); } +future<> messaging_service::unregister_prepare_message() { + return unregister_handler(messaging_verb::PREPARE_MESSAGE); +} // PREPARE_DONE_MESSAGE void messaging_service::register_prepare_done_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func) { @@ -957,6 +973,9 @@ future<> messaging_service::send_prepare_done_message(msg_addr id, UUID plan_id, return send_message(this, messaging_verb::PREPARE_DONE_MESSAGE, id, plan_id, dst_cpu_id); } +future<> messaging_service::unregister_prepare_done_message() { + return unregister_handler(messaging_verb::PREPARE_DONE_MESSAGE); +} // STREAM_MUTATION void messaging_service::register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional fragmented, rpc::optional reason)>&& func) { @@ -981,6 +1000,9 @@ future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, return send_message(this, messaging_verb::STREAM_MUTATION_DONE, id, plan_id, std::move(ranges), cf_id, dst_cpu_id); } +future<> messaging_service::unregister_stream_mutation_done() { + return unregister_handler(messaging_verb::STREAM_MUTATION_DONE); +} // COMPLETE_MESSAGE void messaging_service::register_complete_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id, rpc::optional failed)>&& func) { @@ -990,6 +1012,9 @@ future<> messaging_service::send_complete_message(msg_addr id, UUID plan_id, uns return send_message(this, messaging_verb::COMPLETE_MESSAGE, id, plan_id, dst_cpu_id, failed); } +future<> messaging_service::unregister_complete_message() { + return unregister_handler(messaging_verb::COMPLETE_MESSAGE); +} void messaging_service::register_gossip_echo(std::function ()>&& func) { register_handler(this, messaging_verb::GOSSIP_ECHO, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 58c14ce103..5486c985c8 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -297,10 +297,12 @@ public: streaming::prepare_message msg, UUID plan_id, sstring description, rpc::optional reason)>&& func); future send_prepare_message(msg_addr id, streaming::prepare_message msg, UUID plan_id, sstring description, streaming::stream_reason); + future<> unregister_prepare_message(); // Wrapper for PREPARE_DONE_MESSAGE verb void register_prepare_done_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func); future<> send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id); + future<> unregister_prepare_done_message(); // Wrapper for STREAM_MUTATION verb void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional, rpc::optional)>&& func); @@ -309,6 +311,7 @@ public: // Wrapper for STREAM_MUTATION_FRAGMENTS // The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use. void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source)>&& func); + future<> unregister_stream_mutation_fragments(); rpc::sink make_sink_for_stream_mutation_fragments(rpc::source>& source); future, rpc::source> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); @@ -316,22 +319,27 @@ public: future, rpc::source> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); rpc::sink make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source& source); void register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + future<> unregister_repair_get_row_diff_with_rpc_stream(); // Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM future, rpc::source> make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); rpc::sink make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source& source); void register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + future<> unregister_repair_put_row_diff_with_rpc_stream(); // Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM future, rpc::source> make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); rpc::sink make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source& source); void register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + future<> unregister_repair_get_full_row_hashes_with_rpc_stream(); void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id); + future<> unregister_stream_mutation_done(); void register_complete_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id, rpc::optional failed)>&& func); future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id, bool failed = false); + future<> unregister_complete_message(); // Wrapper for REPAIR_CHECKSUM_RANGE verb void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, dht::token_range range, rpc::optional hash_version)>&& func); diff --git a/repair/row_level.cc b/repair/row_level.cc index 96e00f54dd..08fabc2132 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2210,6 +2210,25 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed repair_uninit_messaging_service_handler() { + return netw::get_messaging_service().invoke_on_all([] (auto& ms) { + return when_all_succeed( + ms.unregister_repair_get_row_diff_with_rpc_stream(), + ms.unregister_repair_put_row_diff_with_rpc_stream(), + ms.unregister_repair_get_full_row_hashes_with_rpc_stream(), + ms.unregister_repair_get_full_row_hashes(), + ms.unregister_repair_get_combined_row_hash(), + ms.unregister_repair_get_sync_boundary(), + ms.unregister_repair_get_row_diff(), + ms.unregister_repair_put_row_diff(), + ms.unregister_repair_row_level_start(), + ms.unregister_repair_row_level_stop(), + ms.unregister_repair_get_estimated_partitions(), + ms.unregister_repair_set_estimated_partitions(), + ms.unregister_repair_get_diff_algorithms()).discard_result(); + }); +} + class row_level_repair { repair_info& _ri; sstring _cf_name; diff --git a/repair/row_level.hh b/repair/row_level.hh index dbf83e5d1a..a067752c09 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -45,6 +45,7 @@ private: }; future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator); +future<> repair_uninit_messaging_service_handler(); class repair_info; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 817d439483..d38b92a238 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -5112,18 +5112,22 @@ void storage_proxy::init_messaging_service() { future<> storage_proxy::uninit_messaging_service() { auto& ms = netw::get_local_messaging_service(); return when_all_succeed( + ms.unregister_counter_mutation(), ms.unregister_mutation(), + ms.unregister_hint_mutation(), ms.unregister_mutation_done(), ms.unregister_mutation_failed(), ms.unregister_read_data(), ms.unregister_read_mutation_data(), ms.unregister_read_digest(), ms.unregister_truncate(), + ms.unregister_get_schema_version(), ms.unregister_paxos_prepare(), ms.unregister_paxos_accept(), ms.unregister_paxos_learn(), ms.unregister_paxos_prune() ).discard_result(); + } future>, cache_temperature>> @@ -5217,8 +5221,7 @@ future<> storage_proxy::drain_on_shutdown() { future<> storage_proxy::stop() { - // FIXME: hints manager should be stopped here but it seems like this function is never called - return uninit_messaging_service(); + return make_ready_future<>(); } } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index effb329bac..5e57dfad6a 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -314,7 +314,6 @@ private: cdc_stats _cdc_stats; private: - future<> uninit_messaging_service(); future query_singular(lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, @@ -469,6 +468,7 @@ public: return next; } void init_messaging_service(); + future<> uninit_messaging_service(); // Applies mutation on this node. // Resolves with timed_out_error when timeout is reached. diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 264e0a6c5f..26e59a1d69 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -317,6 +317,15 @@ void stream_session::init_messaging_service_handler() { }); } +future<> stream_session::uninit_messaging_service_handler() { + return when_all_succeed( + ms().unregister_prepare_message(), + ms().unregister_prepare_done_message(), + ms().unregister_stream_mutation_fragments(), + ms().unregister_stream_mutation_done(), + ms().unregister_complete_message()).discard_result(); +} + distributed* stream_session::_db; distributed* stream_session::_sys_dist_ks; distributed* stream_session::_view_update_generator; @@ -340,9 +349,13 @@ future<> stream_session::init_streaming_service(distributed& db, distr // }); return get_stream_manager().start().then([] { gms::get_local_gossiper().register_(get_local_stream_manager().shared_from_this()); - return _db->invoke_on_all([] (auto& db) { - init_messaging_service_handler(); - }); + return smp::invoke_on_all([] { init_messaging_service_handler(); }); + }); +} + +future<> stream_session::uninit_streaming_service() { + return smp::invoke_on_all([] { + return uninit_messaging_service_handler(); }); } diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 444bae50e7..cdc8da3965 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -142,6 +142,7 @@ private: using token = dht::token; using ring_position = dht::ring_position; static void init_messaging_service_handler(); + static future<> uninit_messaging_service_handler(); static distributed* _db; static distributed* _sys_dist_ks; static distributed* _view_update_generator; @@ -152,6 +153,7 @@ public: static database& get_local_db() { return _db->local(); } static distributed& get_db() { return *_db; }; static future<> init_streaming_service(distributed& db, distributed& sys_dist_ks, distributed& view_update_generator); + static future<> uninit_streaming_service(); public: /** * Streaming endpoint.