From dc50ce0ce57b2bfe46dfceb4cd7ad06e2e3d62dc Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 17 Nov 2016 00:11:13 +0000 Subject: [PATCH] streaming: Make the mutation readers when streaming starts Currenlty we make the mutation readers for streaming at different time point, i.e., do_for_each(_ranges.begin(), _ranges.end(), [] (auto range) { make a mutation reader for this range read mutations from the reader and send }) If there are write workload in the background, we will stream extra data, since the later the reader is made the more data we need to send. Fix it by making all the readers before starting to stream. Fixes #1815 Message-Id: <1479341474-1364-2-git-send-email-asias@scylladb.com> --- streaming/stream_transfer_task.cc | 43 +++++++++++++++++++++---------- streaming/stream_transfer_task.hh | 2 ++ 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index bdbfc6eb2a..f3f6867a3e 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -73,6 +73,7 @@ struct send_info { size_t mutations_nr{0}; semaphore mutations_done{0}; bool error_logged = false; + mutation_reader reader; send_info(database& db_, utils::UUID plan_id_, utils::UUID cf_id_, query::partition_range pr_, net::messaging_service::msg_addr id_, uint32_t dst_cpu_id_) @@ -82,18 +83,21 @@ struct send_info { , pr(pr_) , id(id_) , dst_cpu_id(dst_cpu_id_) { + auto& cf = db.find_column_family(this->cf_id); + // reader = std::move(cf.make_streaming_reader(cf.schema(), this->pr)); + reader = cf.make_streaming_reader(cf.schema(), this->pr); } }; -future<> do_send_mutations(auto si, auto fm, bool fragmented) { - return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable { +future<> do_send_mutations(auto& si, auto fm, bool fragmented) { + return get_local_stream_manager().mutation_send_limiter().wait().then([&si, fragmented, fm = std::move(fm)] () mutable { sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); auto fm_size = fm.representation().size(); - net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([si, fm_size] { + net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([&si, fm_size] { sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); si->mutations_done.signal(); - }).handle_exception([si] (auto ep) { + }).handle_exception([&si] (auto ep) { // There might be larger number of STREAM_MUTATION inflight. // Log one error per column_family per range if (!si->error_logged) { @@ -108,17 +112,16 @@ future<> do_send_mutations(auto si, auto fm, bool fragmented) { } future<> send_mutations(auto si) { - auto& cf = si->db.find_column_family(si->cf_id); - return do_with(cf.make_streaming_reader(cf.schema(), si->pr), [si] (auto& reader) { - return repeat([si, &reader] () { - return reader().then([si] (auto smopt) { + return do_with(std::move(si), [] (auto& si) { + return repeat([&si] () { + return si->reader().then([&si] (auto smopt) { if (smopt && si->db.column_family_exists(si->cf_id)) { size_t fragment_size = default_frozen_fragment_size; // Mutations cannot be sent fragmented if the receiving side doesn't support that. if (!service::get_local_storage_service().cluster_supports_large_partitions()) { fragment_size = std::numeric_limits::max(); } - return fragment_and_freeze(std::move(*smopt), [si] (auto fm, bool fragmented) { + return fragment_and_freeze(std::move(*smopt), [&si] (auto fm, bool fragmented) { si->mutations_nr++; return do_send_mutations(si, std::move(fm), fragmented); }, fragment_size).then([] { return stop_iteration::no; }); @@ -126,9 +129,9 @@ future<> send_mutations(auto si) { return make_ready_future(stop_iteration::yes); } }); + }).then([&si] { + return si->mutations_done.wait(si->mutations_nr); }); - }).then([si] { - return si->mutations_done.wait(si->mutations_nr); }); } @@ -149,9 +152,23 @@ void stream_transfer_task::start() { [this, plan_id, cf_id, id, dst_cpu_id, pr] (unsigned shard) { sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}, invoke_on shard={}", plan_id, cf_id, shard); return this->session->get_db().invoke_on(shard, [plan_id, cf_id, id, dst_cpu_id, pr] (database& db) { + auto si = make_foreign(make_shared(db, plan_id, cf_id, pr, id, dst_cpu_id)); + return make_ready_future>>(std::move(si)); + }).then([this, shard] (auto si) { + this->_send_infos.emplace(shard, std::move(si)); + return make_ready_future<>(); + }); + }); + }).then([this, plan_id, cf_id, id] { + auto&cf = this->session->get_local_db().find_column_family(cf_id); + sslog.info("[Stream #{}] stream_transfer_task: cf_id={}, ks={}, cf={}, send_info.size={} to peer {}", + plan_id, cf_id, cf.schema()->ks_name(), cf.schema()->cf_name(), this->_send_infos.size(), id); + return do_for_each(this->_send_infos, [this] (auto& item) mutable { + auto shard = item.first; + auto si = std::move(item.second); + return this->session->get_db().invoke_on(shard, [si = std::move(si)] (database& db) mutable { // Send mutations on related shards, do not capture this - auto si = make_lw_shared(db, plan_id, cf_id, pr, id, dst_cpu_id); - return send_mutations(si); + return send_mutations(std::move(si)); }); }); }).then([this, plan_id, cf_id, id] { diff --git a/streaming/stream_transfer_task.hh b/streaming/stream_transfer_task.hh index c1f3712346..f72e27a767 100644 --- a/streaming/stream_transfer_task.hh +++ b/streaming/stream_transfer_task.hh @@ -48,6 +48,7 @@ namespace streaming { class stream_session; +class send_info; /** * StreamTransferTask sends sections of SSTable files in certain ColumnFamily. @@ -59,6 +60,7 @@ private: // A stream_transfer_task always contains the same range to stream std::vector> _ranges; long _total_size; + std::unordered_multimap>> _send_infos; public: using UUID = utils::UUID; stream_transfer_task(stream_transfer_task&&) = default;