diff --git a/main.cc b/main.cc index e06449a57e..1bbd3fbda3 100644 --- a/main.cc +++ b/main.cc @@ -967,12 +967,16 @@ int main(int ac, char** av) { mm.init_messaging_service(); }).get(); supervisor::notify("initializing storage proxy RPC verbs"); - proxy.invoke_on_all([] (service::storage_proxy& p) { - p.init_messaging_service(); - }).get(); + proxy.invoke_on_all(&service::storage_proxy::init_messaging_service).get(); + auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] { + proxy.invoke_on_all(&service::storage_proxy::uninit_messaging_service).get(); + }); supervisor::notify("starting streaming service"); streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator).get(); + auto stop_streaming_service = defer_verbose_shutdown("streaming service", [] { + streaming::stream_session::uninit_streaming_service().get(); + }); api::set_server_stream_manager(ctx).get(); supervisor::notify("starting hinted handoff manager"); @@ -1005,6 +1009,9 @@ int main(int ac, char** av) { rs.stop().get(); }); repair_init_messaging_service_handler(rs, sys_dist_ks, view_update_generator).get(); + auto stop_repair_messages = defer_verbose_shutdown("repair message handlers", [] { + repair_uninit_messaging_service_handler().get(); + }); supervisor::notify("starting storage service", true); auto& ss = service::get_local_storage_service(); ss.init_messaging_service_part().get(); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 7369b82a7a..4bfb641808 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -796,6 +796,10 @@ void messaging_service::register_stream_mutation_fragments(std::function messaging_service::unregister_stream_mutation_fragments() { + return unregister_handler(messaging_verb::STREAM_MUTATION_FRAGMENTS); +} + template future, rpc::source> do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr rpc_client, std::unique_ptr& rpc) { @@ -827,6 +831,9 @@ rpc::sink messaging_service::make_sink_for_repair_g void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); } +future<> messaging_service::unregister_repair_get_row_diff_with_rpc_stream() { + return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM); +} // Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM future, rpc::source> @@ -846,6 +853,9 @@ rpc::sink messaging_service::make_sink_for_repair_put_row_dif void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); } +future<> messaging_service::unregister_repair_put_row_diff_with_rpc_stream() { + return unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM); +} // Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM future, rpc::source> @@ -865,6 +875,9 @@ rpc::sink messaging_service::make_sink_for_repair_get_full void messaging_service::register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM, std::move(func)); } +future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_stream() { + return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM); +} // Send a message for verb template @@ -948,6 +961,9 @@ future messaging_service::send_prepare_message(msg_a return send_message(this, messaging_verb::PREPARE_MESSAGE, id, std::move(msg), plan_id, std::move(description), reason); } +future<> messaging_service::unregister_prepare_message() { + return unregister_handler(messaging_verb::PREPARE_MESSAGE); +} // PREPARE_DONE_MESSAGE void messaging_service::register_prepare_done_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func) { @@ -957,6 +973,9 @@ future<> messaging_service::send_prepare_done_message(msg_addr id, UUID plan_id, return send_message(this, messaging_verb::PREPARE_DONE_MESSAGE, id, plan_id, dst_cpu_id); } +future<> messaging_service::unregister_prepare_done_message() { + return unregister_handler(messaging_verb::PREPARE_DONE_MESSAGE); +} // STREAM_MUTATION void messaging_service::register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional fragmented, rpc::optional reason)>&& func) { @@ -981,6 +1000,9 @@ future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, return send_message(this, messaging_verb::STREAM_MUTATION_DONE, id, plan_id, std::move(ranges), cf_id, dst_cpu_id); } +future<> messaging_service::unregister_stream_mutation_done() { + return unregister_handler(messaging_verb::STREAM_MUTATION_DONE); +} // COMPLETE_MESSAGE void messaging_service::register_complete_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id, rpc::optional failed)>&& func) { @@ -990,6 +1012,9 @@ future<> messaging_service::send_complete_message(msg_addr id, UUID plan_id, uns return send_message(this, messaging_verb::COMPLETE_MESSAGE, id, plan_id, dst_cpu_id, failed); } +future<> messaging_service::unregister_complete_message() { + return unregister_handler(messaging_verb::COMPLETE_MESSAGE); +} void messaging_service::register_gossip_echo(std::function ()>&& func) { register_handler(this, messaging_verb::GOSSIP_ECHO, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 58c14ce103..5486c985c8 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -297,10 +297,12 @@ public: streaming::prepare_message msg, UUID plan_id, sstring description, rpc::optional reason)>&& func); future send_prepare_message(msg_addr id, streaming::prepare_message msg, UUID plan_id, sstring description, streaming::stream_reason); + future<> unregister_prepare_message(); // Wrapper for PREPARE_DONE_MESSAGE verb void register_prepare_done_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func); future<> send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id); + future<> unregister_prepare_done_message(); // Wrapper for STREAM_MUTATION verb void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional, rpc::optional)>&& func); @@ -309,6 +311,7 @@ public: // Wrapper for STREAM_MUTATION_FRAGMENTS // The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use. void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source)>&& func); + future<> unregister_stream_mutation_fragments(); rpc::sink make_sink_for_stream_mutation_fragments(rpc::source>& source); future, rpc::source> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); @@ -316,22 +319,27 @@ public: future, rpc::source> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); rpc::sink make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source& source); void register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + future<> unregister_repair_get_row_diff_with_rpc_stream(); // Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM future, rpc::source> make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); rpc::sink make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source& source); void register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + future<> unregister_repair_put_row_diff_with_rpc_stream(); // Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM future, rpc::source> make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); rpc::sink make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source& source); void register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + future<> unregister_repair_get_full_row_hashes_with_rpc_stream(); void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id); + future<> unregister_stream_mutation_done(); void register_complete_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id, rpc::optional failed)>&& func); future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id, bool failed = false); + future<> unregister_complete_message(); // Wrapper for REPAIR_CHECKSUM_RANGE verb void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, dht::token_range range, rpc::optional hash_version)>&& func); diff --git a/repair/row_level.cc b/repair/row_level.cc index 96e00f54dd..08fabc2132 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2210,6 +2210,25 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed repair_uninit_messaging_service_handler() { + return netw::get_messaging_service().invoke_on_all([] (auto& ms) { + return when_all_succeed( + ms.unregister_repair_get_row_diff_with_rpc_stream(), + ms.unregister_repair_put_row_diff_with_rpc_stream(), + ms.unregister_repair_get_full_row_hashes_with_rpc_stream(), + ms.unregister_repair_get_full_row_hashes(), + ms.unregister_repair_get_combined_row_hash(), + ms.unregister_repair_get_sync_boundary(), + ms.unregister_repair_get_row_diff(), + ms.unregister_repair_put_row_diff(), + ms.unregister_repair_row_level_start(), + ms.unregister_repair_row_level_stop(), + ms.unregister_repair_get_estimated_partitions(), + ms.unregister_repair_set_estimated_partitions(), + ms.unregister_repair_get_diff_algorithms()).discard_result(); + }); +} + class row_level_repair { repair_info& _ri; sstring _cf_name; diff --git a/repair/row_level.hh b/repair/row_level.hh index dbf83e5d1a..a067752c09 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -45,6 +45,7 @@ private: }; future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator); +future<> repair_uninit_messaging_service_handler(); class repair_info; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 817d439483..d38b92a238 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -5112,18 +5112,22 @@ void storage_proxy::init_messaging_service() { future<> storage_proxy::uninit_messaging_service() { auto& ms = netw::get_local_messaging_service(); return when_all_succeed( + ms.unregister_counter_mutation(), ms.unregister_mutation(), + ms.unregister_hint_mutation(), ms.unregister_mutation_done(), ms.unregister_mutation_failed(), ms.unregister_read_data(), ms.unregister_read_mutation_data(), ms.unregister_read_digest(), ms.unregister_truncate(), + ms.unregister_get_schema_version(), ms.unregister_paxos_prepare(), ms.unregister_paxos_accept(), ms.unregister_paxos_learn(), ms.unregister_paxos_prune() ).discard_result(); + } future>, cache_temperature>> @@ -5217,8 +5221,7 @@ future<> storage_proxy::drain_on_shutdown() { future<> storage_proxy::stop() { - // FIXME: hints manager should be stopped here but it seems like this function is never called - return uninit_messaging_service(); + return make_ready_future<>(); } } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index effb329bac..5e57dfad6a 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -314,7 +314,6 @@ private: cdc_stats _cdc_stats; private: - future<> uninit_messaging_service(); future query_singular(lw_shared_ptr cmd, dht::partition_range_vector&& partition_ranges, db::consistency_level cl, @@ -469,6 +468,7 @@ public: return next; } void init_messaging_service(); + future<> uninit_messaging_service(); // Applies mutation on this node. // Resolves with timed_out_error when timeout is reached. diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 264e0a6c5f..26e59a1d69 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -317,6 +317,15 @@ void stream_session::init_messaging_service_handler() { }); } +future<> stream_session::uninit_messaging_service_handler() { + return when_all_succeed( + ms().unregister_prepare_message(), + ms().unregister_prepare_done_message(), + ms().unregister_stream_mutation_fragments(), + ms().unregister_stream_mutation_done(), + ms().unregister_complete_message()).discard_result(); +} + distributed* stream_session::_db; distributed* stream_session::_sys_dist_ks; distributed* stream_session::_view_update_generator; @@ -340,9 +349,13 @@ future<> stream_session::init_streaming_service(distributed& db, distr // }); return get_stream_manager().start().then([] { gms::get_local_gossiper().register_(get_local_stream_manager().shared_from_this()); - return _db->invoke_on_all([] (auto& db) { - init_messaging_service_handler(); - }); + return smp::invoke_on_all([] { init_messaging_service_handler(); }); + }); +} + +future<> stream_session::uninit_streaming_service() { + return smp::invoke_on_all([] { + return uninit_messaging_service_handler(); }); } diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 444bae50e7..cdc8da3965 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -142,6 +142,7 @@ private: using token = dht::token; using ring_position = dht::ring_position; static void init_messaging_service_handler(); + static future<> uninit_messaging_service_handler(); static distributed* _db; static distributed* _sys_dist_ks; static distributed* _view_update_generator; @@ -152,6 +153,7 @@ public: static database& get_local_db() { return _db->local(); } static distributed& get_db() { return *_db; }; static future<> init_streaming_service(distributed& db, distributed& sys_dist_ks, distributed& view_update_generator); + static future<> uninit_streaming_service(); public: /** * Streaming endpoint.