diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 6dcdee7a5f..5a13607cb4 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -680,6 +680,9 @@ rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rp future, rpc::source> messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) { + if (is_stopping()) { + return make_exception_future, rpc::source>(rpc::closed_error()); + } auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); return rpc_client->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { auto rpc_handler = rpc()->make_client (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); @@ -712,6 +715,9 @@ do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr, rpc::source> messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM; + if (is_stopping()) { + return make_exception_future, rpc::source>(rpc::closed_error()); + } auto rpc_client = get_rpc_client(verb, id); return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); } @@ -728,6 +734,9 @@ void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::functi future, rpc::source> messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM; + if (is_stopping()) { + return make_exception_future, rpc::source>(rpc::closed_error()); + } auto rpc_client = get_rpc_client(verb, id); return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); } @@ -744,6 +753,9 @@ void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::functi future, rpc::source> messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM; + if (is_stopping()) { + return make_exception_future, rpc::source>(rpc::closed_error()); + } auto rpc_client = get_rpc_client(verb, id); return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); }