combined_mutation_reader: fix fast-fowarding related row-skipping bug
When fast forwarding is enabled and all readers positioned inside the current partition return EOS, return EOS from the combined-reader too. Instead of skipping to the next partition if there are idle readers (positioned at some later partition) available. This will cause rows to be skipped in some cases. The fix is to distinguish EOS'd readers that are only halted (waiting for a fast-forward) from thoose really out of data. To achieve this we track the last fragment-kind the reader emitted. If that was a partition-end then the reader is out of data, otherwise it might emit more fragments after a fast-forward. Without this additional information it is impossible to determine why a reader reached EOS and the code later may make the wrong decision about whether the combined-reader as a whole is at EOS or not. Also when fast-forwarding between partition-ranges or calling next_partition() we set the last fragment-kind of forwarded readers because they should emit a partition-start, otherwise they are out of data. Signed-off-by: Botond Dénes <bdenes@scylladb.com> Message-Id: <6f0b21b1ec62e1197de6b46510d5508cdb4a6977.1512569218.git.bdenes@scylladb.com>
This commit is contained in:
@@ -69,7 +69,7 @@ void mutation_reader_merger::add_readers(std::vector<flat_mutation_reader> new_r
|
||||
for (auto&& new_reader : new_readers) {
|
||||
_all_readers.emplace_back(std::move(new_reader));
|
||||
auto* r = &_all_readers.back();
|
||||
_next.push_back(r);
|
||||
_next.emplace_back(r, mutation_fragment::kind::partition_end);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,25 +108,27 @@ struct mutation_reader_merger::fragment_heap_compare {
|
||||
};
|
||||
|
||||
future<> mutation_reader_merger::prepare_next() {
|
||||
return parallel_for_each(_next, [this] (flat_mutation_reader* mr) {
|
||||
return (*mr)().then([this, mr] (mutation_fragment_opt mfo) {
|
||||
return parallel_for_each(_next, [this] (reader_and_last_fragment_kind rk) {
|
||||
return (*rk.reader)().then([this, rk] (mutation_fragment_opt mfo) {
|
||||
if (mfo) {
|
||||
if (mfo->is_partition_start()) {
|
||||
_reader_heap.emplace_back(mr, std::move(*mfo));
|
||||
_reader_heap.emplace_back(rk.reader, std::move(*mfo));
|
||||
boost::push_heap(_reader_heap, reader_heap_compare(*_schema));
|
||||
} else {
|
||||
_fragment_heap.emplace_back(mr, std::move(*mfo));
|
||||
_fragment_heap.emplace_back(rk.reader, std::move(*mfo));
|
||||
boost::range::push_heap(_fragment_heap, fragment_heap_compare(*_schema));
|
||||
}
|
||||
} else if (_fwd_sm == streamed_mutation::forwarding::yes) {
|
||||
} else if (_fwd_sm == streamed_mutation::forwarding::yes && rk.last_kind != mutation_fragment::kind::partition_end) {
|
||||
// When in streamed_mutation::forwarding mode we need
|
||||
// to keep track of readers that returned
|
||||
// end-of-stream to know what readers to ff. We can't
|
||||
// just ff all readers as we might drop fragments from
|
||||
// partitions we haven't even read yet.
|
||||
_halted_readers.push_back(mr);
|
||||
// Readers whoose last emitted fragment was a partition
|
||||
// end are out of data for good for the current range.
|
||||
_halted_readers.push_back(rk);
|
||||
} else if (_fwd_mr == mutation_reader::forwarding::no) {
|
||||
_all_readers.remove_if([mr] (auto& r) { return &r == mr; });
|
||||
_all_readers.remove_if([mr = rk.reader] (auto& r) { return &r == mr; });
|
||||
}
|
||||
});
|
||||
}).then([this] {
|
||||
@@ -152,7 +154,7 @@ void mutation_reader_merger::prepare_forwardable_readers() {
|
||||
|
||||
std::move(_halted_readers.begin(), _halted_readers.end(), std::back_inserter(_next));
|
||||
for (auto& df : _fragment_heap) {
|
||||
_next.emplace_back(df.reader);
|
||||
_next.emplace_back(df.reader, df.fragment.mutation_fragment_kind());
|
||||
}
|
||||
|
||||
_halted_readers.clear();
|
||||
@@ -180,7 +182,7 @@ future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::
|
||||
// If we ran out of fragments for the current partition, select the
|
||||
// readers for the next one.
|
||||
if (_fragment_heap.empty()) {
|
||||
if (_reader_heap.empty()) {
|
||||
if (!_halted_readers.empty() || _reader_heap.empty()) {
|
||||
return make_ready_future<mutation_fragment_batch>(_current);
|
||||
}
|
||||
|
||||
@@ -203,7 +205,7 @@ future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::
|
||||
boost::range::pop_heap(_fragment_heap, fragment_heap_compare(*_schema));
|
||||
auto& n = _fragment_heap.back();
|
||||
_current.emplace_back(std::move(n.fragment));
|
||||
_next.emplace_back(n.reader);
|
||||
_next.emplace_back(n.reader, n.fragment.mutation_fragment_kind());
|
||||
_fragment_heap.pop_back();
|
||||
}
|
||||
while (!_fragment_heap.empty() && equal(_current.back().position(), _fragment_heap.front().fragment.position()));
|
||||
@@ -213,8 +215,9 @@ future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::
|
||||
|
||||
void mutation_reader_merger::next_partition() {
|
||||
prepare_forwardable_readers();
|
||||
for (auto& r : _next) {
|
||||
r->next_partition();
|
||||
for (auto& rk : _next) {
|
||||
rk.last_kind = mutation_fragment::kind::partition_end;
|
||||
rk.reader->next_partition();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,7 +228,7 @@ future<> mutation_reader_merger::fast_forward_to(const dht::partition_range& pr)
|
||||
_reader_heap.clear();
|
||||
|
||||
return parallel_for_each(_all_readers, [this, &pr] (flat_mutation_reader& mr) {
|
||||
_next.push_back(&mr);
|
||||
_next.emplace_back(&mr, mutation_fragment::kind::partition_end);
|
||||
return mr.fast_forward_to(pr);
|
||||
}).then([this, &pr] {
|
||||
add_readers(_selector->fast_forward_to(pr));
|
||||
@@ -234,8 +237,8 @@ future<> mutation_reader_merger::fast_forward_to(const dht::partition_range& pr)
|
||||
|
||||
future<> mutation_reader_merger::fast_forward_to(position_range pr) {
|
||||
prepare_forwardable_readers();
|
||||
return parallel_for_each(_next, [this, pr = std::move(pr)] (flat_mutation_reader* mr) {
|
||||
return mr->fast_forward_to(pr);
|
||||
return parallel_for_each(_next, [this, pr = std::move(pr)] (reader_and_last_fragment_kind rk) {
|
||||
return rk.reader->fast_forward_to(pr);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -271,6 +271,16 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
struct reader_and_last_fragment_kind {
|
||||
flat_mutation_reader* reader;
|
||||
mutation_fragment::kind last_kind;
|
||||
|
||||
reader_and_last_fragment_kind(flat_mutation_reader* r, mutation_fragment::kind k)
|
||||
: reader(r)
|
||||
, last_kind(k) {
|
||||
}
|
||||
};
|
||||
|
||||
using mutation_fragment_batch = boost::iterator_range<std::vector<mutation_fragment>::iterator>;
|
||||
private:
|
||||
struct reader_heap_compare;
|
||||
@@ -287,9 +297,9 @@ private:
|
||||
// Readers and their current fragments, belonging to the current
|
||||
// partition.
|
||||
std::vector<reader_and_fragment> _fragment_heap;
|
||||
std::vector<flat_mutation_reader*> _next;
|
||||
std::vector<reader_and_last_fragment_kind> _next;
|
||||
// Readers that reached EOS.
|
||||
std::vector<flat_mutation_reader*> _halted_readers;
|
||||
std::vector<reader_and_last_fragment_kind> _halted_readers;
|
||||
std::vector<mutation_fragment> _current;
|
||||
dht::decorated_key_opt _key;
|
||||
const schema_ptr _schema;
|
||||
|
||||
Reference in New Issue
Block a user