messaging_service: Check if messaging_service is stopped before get_rpc_client

get_rpc_client assumes the messaging_service is not stopped. We should check
is_stopping() before we call get_rpc_client.

We do such check in existing code, e.g., send_message and friends. Do
the same check in the newly introduced
make_sink_and_source_for_stream_mutation_fragments() and friends for row
level repair.

Fixes: #4767
This commit is contained in:
Asias He
2019-07-30 17:52:56 +08:00
committed by Avi Kivity
parent 74349bdf7e
commit 5d3e4d7b73

View File

@@ -680,6 +680,9 @@ rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rp
future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>
messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) {
if (is_stopping()) {
return make_exception_future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment>().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
@@ -712,6 +715,9 @@ do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr<mes
future<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>
messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM;
if (is_stopping()) {
return make_exception_future<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
return do_make_sink_source<repair_hash_with_cmd, repair_row_on_wire_with_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
}
@@ -728,6 +734,9 @@ void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::functi
future<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>
messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM;
if (is_stopping()) {
return make_exception_future<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
return do_make_sink_source<repair_row_on_wire_with_cmd, repair_stream_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
}
@@ -744,6 +753,9 @@ void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::functi
future<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>
messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM;
if (is_stopping()) {
return make_exception_future<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
return do_make_sink_source<repair_stream_cmd, repair_hash_with_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
}