diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index e1e4fa0b02..9388480387 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -176,6 +176,13 @@ future<> send_mutations(lw_shared_ptr si) { } future<> send_mutation_fragments(lw_shared_ptr si) { + return si->reader.peek(db::no_timeout).then([si] (mutation_fragment* mfp) { + if (!mfp) { + // The reader contains no data + sslog.info("[Stream #{}] Skip sending ks={}, cf={}, reader contains no data, with new rpc streaming", + si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name()); + return make_ready_future<>(); + } return si->estimate_partitions().then([si] (size_t estimated_partitions) { sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions); return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink sink, rpc::source source) mutable { @@ -233,6 +240,7 @@ future<> send_mutation_fragments(lw_shared_ptr si) { }); }); }); + }); } future<> stream_transfer_task::execute() {