streaming: Futurize estimate_partitions

The loop can take a long time if the number of sstables and/or ranges
are large. To fix, futurize the loop.

Fixes: #4005

Message-Id: <3b05cb84f3f57cc566702142c6365a04b075018e.1545290730.git.asias@scylladb.com>
This commit is contained in:
Asias He
2018-12-20 15:26:33 +08:00
committed by Avi Kivity
parent 385d74db01
commit bcba6b4f4d

View File

@@ -104,14 +104,16 @@ struct send_info {
, prs(to_partition_ranges(ranges))
, reader(cf.make_streaming_reader(cf.schema(), prs)) {
}
size_t estimate_partitions() {
auto sstables = cf.get_sstables();
size_t partition_count = 0;
for (auto& range : ranges) {
partition_count = boost::accumulate(*sstables, uint64_t(0),
[&range] (uint64_t x, auto&& sst) { return x + sst->estimated_keys_for_range(range); });
}
return partition_count;
future<size_t> estimate_partitions() {
return do_with(cf.get_sstables(), size_t(0), [this] (auto& sstables, size_t& partition_count) {
return do_for_each(*sstables, [this, &partition_count] (auto& sst) {
return do_for_each(ranges, [this, &sst, &partition_count] (auto& range) {
partition_count += sst->estimated_keys_for_range(range);
});
}).then([&partition_count] {
return partition_count;
});
});
}
};
@@ -156,7 +158,7 @@ future<> send_mutations(lw_shared_ptr<send_info> si) {
}
future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
size_t estimated_partitions = si->estimate_partitions();
return si->estimate_partitions().then([si] (size_t estimated_partitions) {
sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions);
return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink<frozen_mutation_fragment> sink, rpc::source<int32_t> source) mutable {
auto got_error_from_peer = make_lw_shared<bool>(false);
@@ -196,6 +198,7 @@ future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
return when_all_succeed(std::move(source_op), std::move(sink_op));
});
});
}
future<> stream_transfer_task::execute() {