From 5d3e4d7b73cbbb3450afe6077903fbc3769aec4a Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 30 Jul 2019 17:52:56 +0800 Subject: [PATCH] messaging_service: Check if messaging_service is stopped before get_rpc_client get_rpc_client assumes the messaging_service is not stopped. We should check is_stopping() before we call get_rpc_client. We do such check in existing code, e.g., send_message and friends. Do the same check in the newly introduced make_sink_and_source_for_stream_mutation_fragments() and friends for row level repair. Fixes: #4767 --- message/messaging_service.cc | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 6dcdee7a5f..5a13607cb4 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -680,6 +680,9 @@ rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rp future, rpc::source> messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) { + if (is_stopping()) { + return make_exception_future, rpc::source>(rpc::closed_error()); + } auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); return rpc_client->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { auto rpc_handler = rpc()->make_client (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); @@ -712,6 +715,9 @@ do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr, rpc::source> messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM; + if (is_stopping()) { + return make_exception_future, rpc::source>(rpc::closed_error()); + } auto rpc_client = get_rpc_client(verb, id); return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); } @@ -728,6 +734,9 @@ void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::functi future, rpc::source> messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM; + if (is_stopping()) { + return make_exception_future, rpc::source>(rpc::closed_error()); + } auto rpc_client = get_rpc_client(verb, id); return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); } @@ -744,6 +753,9 @@ void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::functi future, rpc::source> messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM; + if (is_stopping()) { + return make_exception_future, rpc::source>(rpc::closed_error()); + } auto rpc_client = get_rpc_client(verb, id); return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); }