From bcba6b4f4d8c9fd45ec5a76ae9f9f3792221300a Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 20 Dec 2018 15:26:33 +0800 Subject: [PATCH] streaming: Futurize estimate_partitions The loop can take a long time if the number of sstables and/or ranges are large. To fix, futurize the loop. Fixes: #4005 Message-Id: <3b05cb84f3f57cc566702142c6365a04b075018e.1545290730.git.asias@scylladb.com> --- streaming/stream_transfer_task.cc | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index c4c82a9393..f595eb7702 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -104,14 +104,16 @@ struct send_info { , prs(to_partition_ranges(ranges)) , reader(cf.make_streaming_reader(cf.schema(), prs)) { } - size_t estimate_partitions() { - auto sstables = cf.get_sstables(); - size_t partition_count = 0; - for (auto& range : ranges) { - partition_count = boost::accumulate(*sstables, uint64_t(0), - [&range] (uint64_t x, auto&& sst) { return x + sst->estimated_keys_for_range(range); }); - } - return partition_count; + future estimate_partitions() { + return do_with(cf.get_sstables(), size_t(0), [this] (auto& sstables, size_t& partition_count) { + return do_for_each(*sstables, [this, &partition_count] (auto& sst) { + return do_for_each(ranges, [this, &sst, &partition_count] (auto& range) { + partition_count += sst->estimated_keys_for_range(range); + }); + }).then([&partition_count] { + return partition_count; + }); + }); } }; @@ -156,7 +158,7 @@ future<> send_mutations(lw_shared_ptr si) { } future<> send_mutation_fragments(lw_shared_ptr si) { - size_t estimated_partitions = si->estimate_partitions(); + return si->estimate_partitions().then([si] (size_t estimated_partitions) { sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions); return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink sink, rpc::source source) mutable { auto got_error_from_peer = make_lw_shared(false); @@ -196,6 +198,7 @@ future<> send_mutation_fragments(lw_shared_ptr si) { return when_all_succeed(std::move(source_op), std::move(sink_op)); }); + }); } future<> stream_transfer_task::execute() {