diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 6dc283685c..0d68e74365 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -85,7 +85,6 @@ struct send_info { }; future do_send_mutations(auto si, auto fm) { - return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable { sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); auto fm_size = fm.representation().size(); net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] { @@ -100,26 +99,27 @@ future do_send_mutations(auto si, auto fm) { sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep); } si->mutations_done.broken(); - }).finally([] { - get_local_stream_manager().mutation_send_limiter().signal(); }); - return stop_iteration::no; - }); + return make_ready_future(stop_iteration::no); } future<> send_mutations(auto si) { auto& cf = si->db.find_column_family(si->cf_id); auto& priority = service::get_local_streaming_read_priority(); return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) { - return repeat([si, &reader] () { - return reader().then([si] (auto mopt) { - if (mopt && si->db.column_family_exists(si->cf_id)) { - si->mutations_nr++; - auto fm = frozen_mutation(*mopt); - return do_send_mutations(si, std::move(fm)); - } else { - return make_ready_future(stop_iteration::yes); - } + return repeat([si, &reader] { + return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] { + return reader().then([si] (auto mopt) { + if (mopt && si->db.column_family_exists(si->cf_id)) { + si->mutations_nr++; + auto fm = frozen_mutation(*mopt); + return do_send_mutations(si, std::move(fm)); + } else { + return make_ready_future(stop_iteration::yes); + } + }); + }).finally([] { + get_local_stream_manager().mutation_send_limiter().signal(); }); }); }).then([si] { @@ -132,7 +132,7 @@ void stream_transfer_task::start() { auto cf_id = this->cf_id; auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id}; sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id); - parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) { + do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) { unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0; unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count; auto cf_id = this->cf_id;