diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index bdbfc6eb2a..f3f6867a3e 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -73,6 +73,7 @@ struct send_info { size_t mutations_nr{0}; semaphore mutations_done{0}; bool error_logged = false; + mutation_reader reader; send_info(database& db_, utils::UUID plan_id_, utils::UUID cf_id_, query::partition_range pr_, net::messaging_service::msg_addr id_, uint32_t dst_cpu_id_) @@ -82,18 +83,21 @@ struct send_info { , pr(pr_) , id(id_) , dst_cpu_id(dst_cpu_id_) { + auto& cf = db.find_column_family(this->cf_id); + // reader = std::move(cf.make_streaming_reader(cf.schema(), this->pr)); + reader = cf.make_streaming_reader(cf.schema(), this->pr); } }; -future<> do_send_mutations(auto si, auto fm, bool fragmented) { - return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable { +future<> do_send_mutations(auto& si, auto fm, bool fragmented) { + return get_local_stream_manager().mutation_send_limiter().wait().then([&si, fragmented, fm = std::move(fm)] () mutable { sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); auto fm_size = fm.representation().size(); - net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([si, fm_size] { + net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([&si, fm_size] { sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); si->mutations_done.signal(); - }).handle_exception([si] (auto ep) { + }).handle_exception([&si] (auto ep) { // There might be larger number of STREAM_MUTATION inflight. // Log one error per column_family per range if (!si->error_logged) { @@ -108,17 +112,16 @@ future<> do_send_mutations(auto si, auto fm, bool fragmented) { } future<> send_mutations(auto si) { - auto& cf = si->db.find_column_family(si->cf_id); - return do_with(cf.make_streaming_reader(cf.schema(), si->pr), [si] (auto& reader) { - return repeat([si, &reader] () { - return reader().then([si] (auto smopt) { + return do_with(std::move(si), [] (auto& si) { + return repeat([&si] () { + return si->reader().then([&si] (auto smopt) { if (smopt && si->db.column_family_exists(si->cf_id)) { size_t fragment_size = default_frozen_fragment_size; // Mutations cannot be sent fragmented if the receiving side doesn't support that. if (!service::get_local_storage_service().cluster_supports_large_partitions()) { fragment_size = std::numeric_limits::max(); } - return fragment_and_freeze(std::move(*smopt), [si] (auto fm, bool fragmented) { + return fragment_and_freeze(std::move(*smopt), [&si] (auto fm, bool fragmented) { si->mutations_nr++; return do_send_mutations(si, std::move(fm), fragmented); }, fragment_size).then([] { return stop_iteration::no; }); @@ -126,9 +129,9 @@ future<> send_mutations(auto si) { return make_ready_future(stop_iteration::yes); } }); + }).then([&si] { + return si->mutations_done.wait(si->mutations_nr); }); - }).then([si] { - return si->mutations_done.wait(si->mutations_nr); }); } @@ -149,9 +152,23 @@ void stream_transfer_task::start() { [this, plan_id, cf_id, id, dst_cpu_id, pr] (unsigned shard) { sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}, invoke_on shard={}", plan_id, cf_id, shard); return this->session->get_db().invoke_on(shard, [plan_id, cf_id, id, dst_cpu_id, pr] (database& db) { + auto si = make_foreign(make_shared(db, plan_id, cf_id, pr, id, dst_cpu_id)); + return make_ready_future>>(std::move(si)); + }).then([this, shard] (auto si) { + this->_send_infos.emplace(shard, std::move(si)); + return make_ready_future<>(); + }); + }); + }).then([this, plan_id, cf_id, id] { + auto&cf = this->session->get_local_db().find_column_family(cf_id); + sslog.info("[Stream #{}] stream_transfer_task: cf_id={}, ks={}, cf={}, send_info.size={} to peer {}", + plan_id, cf_id, cf.schema()->ks_name(), cf.schema()->cf_name(), this->_send_infos.size(), id); + return do_for_each(this->_send_infos, [this] (auto& item) mutable { + auto shard = item.first; + auto si = std::move(item.second); + return this->session->get_db().invoke_on(shard, [si = std::move(si)] (database& db) mutable { // Send mutations on related shards, do not capture this - auto si = make_lw_shared(db, plan_id, cf_id, pr, id, dst_cpu_id); - return send_mutations(si); + return send_mutations(std::move(si)); }); }); }).then([this, plan_id, cf_id, id] { diff --git a/streaming/stream_transfer_task.hh b/streaming/stream_transfer_task.hh index c1f3712346..f72e27a767 100644 --- a/streaming/stream_transfer_task.hh +++ b/streaming/stream_transfer_task.hh @@ -48,6 +48,7 @@ namespace streaming { class stream_session; +class send_info; /** * StreamTransferTask sends sections of SSTable files in certain ColumnFamily. @@ -59,6 +60,7 @@ private: // A stream_transfer_task always contains the same range to stream std::vector> _ranges; long _total_size; + std::unordered_multimap>> _send_infos; public: using UUID = utils::UUID; stream_transfer_task(stream_transfer_task&&) = default;