Yield execution in mutation_result_merger

mutation_result_merger::get can run for a long time. Make it yield
execution from time to time.

Message-Id: <1456674046-14502-1-git-send-email-gleb@scylladb.com>
This commit is contained in:
Gleb Natapov
2016-02-28 17:40:46 +02:00
committed by Avi Kivity
parent 182e6eb89b
commit 22d2b9a2dc

View File

@@ -2851,19 +2851,22 @@ public:
}
}
reconcilable_result get() && {
std::vector<partition> partitions;
uint32_t row_count = 0;
future<reconcilable_result> get() {
auto cmp = [this] (const partition_run& r1, const partition_run& r2) {
const partition& p1 = r1.current();
const partition& p2 = r2.current();
return p1._m.key(*_schema).ring_order_tri_compare(*_schema, p2._m.key(*_schema)) > 0;
};
if (_runs.empty()) {
return make_ready_future<reconcilable_result>(reconcilable_result(0, std::vector<partition>()));
}
boost::range::make_heap(_runs, cmp);
while (!_runs.empty()) {
return repeat_until_value([this, cmp = std::move(cmp), partitions = std::vector<partition>(), row_count = 0u] () mutable {
std::experimental::optional<reconcilable_result> ret;
boost::range::pop_heap(_runs, cmp);
partition_run& next = _runs.back();
const partition& p = next.current();
@@ -2881,18 +2884,19 @@ public:
partitions.push_back(p);
row_count += p._row_count;
}
if (row_count >= _cmd->row_limit) {
break;
if (row_count < _cmd->row_limit) {
next.advance();
if (next.has_more()) {
boost::range::push_heap(_runs, cmp);
} else {
_runs.pop_back();
}
}
next.advance();
if (next.has_more()) {
boost::range::push_heap(_runs, cmp);
} else {
_runs.pop_back();
}
}
return { row_count, std::move(partitions) };
if (_runs.empty() || row_count >= _cmd->row_limit) {
ret = reconcilable_result(row_count, std::move(partitions));
}
return make_ready_future<std::experimental::optional<reconcilable_result>>(std::move(ret));
});
}
};