From b89ced4635cd89b4bb66bc413b882fefb5673b62 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 8 Oct 2019 16:31:02 +0800 Subject: [PATCH] streaming: Do not open rpc stream connection if reader has no data We can use the reader::peek() to check if the reader contains any data. If not, do not open the rpc stream connection. It helps to reduce the port usage. Refs: #4943 --- streaming/stream_transfer_task.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index e1e4fa0b02..9388480387 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -176,6 +176,13 @@ future<> send_mutations(lw_shared_ptr si) { } future<> send_mutation_fragments(lw_shared_ptr si) { + return si->reader.peek(db::no_timeout).then([si] (mutation_fragment* mfp) { + if (!mfp) { + // The reader contains no data + sslog.info("[Stream #{}] Skip sending ks={}, cf={}, reader contains no data, with new rpc streaming", + si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name()); + return make_ready_future<>(); + } return si->estimate_partitions().then([si] (size_t estimated_partitions) { sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions); return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink sink, rpc::source source) mutable { @@ -233,6 +240,7 @@ future<> send_mutation_fragments(lw_shared_ptr si) { }); }); }); + }); } future<> stream_transfer_task::execute() {