combined_mutation_reader: fix fast-fowarding related row-skipping bug

When fast forwarding is enabled and all readers positioned inside the
current partition return EOS, return EOS from the combined-reader
too. Instead of skipping to the next partition if there are idle readers
(positioned at some later partition) available. This will cause rows to
be skipped in some cases.

The fix is to distinguish EOS'd readers that are only halted (waiting
for a fast-forward) from thoose really out of data. To achieve this we
track the last fragment-kind the reader emitted. If that was a
partition-end then the reader is out of data, otherwise it might emit
more fragments after a fast-forward. Without this additional information
it is impossible to determine why a reader reached EOS and the code
later may make the wrong decision about whether the combined-reader as
a whole is at EOS or not.
Also when fast-forwarding between partition-ranges or calling
next_partition() we set the last fragment-kind of forwarded readers
because they should emit a partition-start, otherwise they are out of
data.

Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <6f0b21b1ec62e1197de6b46510d5508cdb4a6977.1512569218.git.bdenes@scylladb.com>
This commit is contained in:
Botond Dénes
2017-12-06 16:07:02 +02:00
committed by Avi Kivity
parent aeb6ebce5a
commit 9661769313
2 changed files with 31 additions and 18 deletions

View File

@@ -69,7 +69,7 @@ void mutation_reader_merger::add_readers(std::vector<flat_mutation_reader> new_r
for (auto&& new_reader : new_readers) {
_all_readers.emplace_back(std::move(new_reader));
auto* r = &_all_readers.back();
_next.push_back(r);
_next.emplace_back(r, mutation_fragment::kind::partition_end);
}
}
@@ -108,25 +108,27 @@ struct mutation_reader_merger::fragment_heap_compare {
};
future<> mutation_reader_merger::prepare_next() {
return parallel_for_each(_next, [this] (flat_mutation_reader* mr) {
return (*mr)().then([this, mr] (mutation_fragment_opt mfo) {
return parallel_for_each(_next, [this] (reader_and_last_fragment_kind rk) {
return (*rk.reader)().then([this, rk] (mutation_fragment_opt mfo) {
if (mfo) {
if (mfo->is_partition_start()) {
_reader_heap.emplace_back(mr, std::move(*mfo));
_reader_heap.emplace_back(rk.reader, std::move(*mfo));
boost::push_heap(_reader_heap, reader_heap_compare(*_schema));
} else {
_fragment_heap.emplace_back(mr, std::move(*mfo));
_fragment_heap.emplace_back(rk.reader, std::move(*mfo));
boost::range::push_heap(_fragment_heap, fragment_heap_compare(*_schema));
}
} else if (_fwd_sm == streamed_mutation::forwarding::yes) {
} else if (_fwd_sm == streamed_mutation::forwarding::yes && rk.last_kind != mutation_fragment::kind::partition_end) {
// When in streamed_mutation::forwarding mode we need
// to keep track of readers that returned
// end-of-stream to know what readers to ff. We can't
// just ff all readers as we might drop fragments from
// partitions we haven't even read yet.
_halted_readers.push_back(mr);
// Readers whoose last emitted fragment was a partition
// end are out of data for good for the current range.
_halted_readers.push_back(rk);
} else if (_fwd_mr == mutation_reader::forwarding::no) {
_all_readers.remove_if([mr] (auto& r) { return &r == mr; });
_all_readers.remove_if([mr = rk.reader] (auto& r) { return &r == mr; });
}
});
}).then([this] {
@@ -152,7 +154,7 @@ void mutation_reader_merger::prepare_forwardable_readers() {
std::move(_halted_readers.begin(), _halted_readers.end(), std::back_inserter(_next));
for (auto& df : _fragment_heap) {
_next.emplace_back(df.reader);
_next.emplace_back(df.reader, df.fragment.mutation_fragment_kind());
}
_halted_readers.clear();
@@ -180,7 +182,7 @@ future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::
// If we ran out of fragments for the current partition, select the
// readers for the next one.
if (_fragment_heap.empty()) {
if (_reader_heap.empty()) {
if (!_halted_readers.empty() || _reader_heap.empty()) {
return make_ready_future<mutation_fragment_batch>(_current);
}
@@ -203,7 +205,7 @@ future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::
boost::range::pop_heap(_fragment_heap, fragment_heap_compare(*_schema));
auto& n = _fragment_heap.back();
_current.emplace_back(std::move(n.fragment));
_next.emplace_back(n.reader);
_next.emplace_back(n.reader, n.fragment.mutation_fragment_kind());
_fragment_heap.pop_back();
}
while (!_fragment_heap.empty() && equal(_current.back().position(), _fragment_heap.front().fragment.position()));
@@ -213,8 +215,9 @@ future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::
void mutation_reader_merger::next_partition() {
prepare_forwardable_readers();
for (auto& r : _next) {
r->next_partition();
for (auto& rk : _next) {
rk.last_kind = mutation_fragment::kind::partition_end;
rk.reader->next_partition();
}
}
@@ -225,7 +228,7 @@ future<> mutation_reader_merger::fast_forward_to(const dht::partition_range& pr)
_reader_heap.clear();
return parallel_for_each(_all_readers, [this, &pr] (flat_mutation_reader& mr) {
_next.push_back(&mr);
_next.emplace_back(&mr, mutation_fragment::kind::partition_end);
return mr.fast_forward_to(pr);
}).then([this, &pr] {
add_readers(_selector->fast_forward_to(pr));
@@ -234,8 +237,8 @@ future<> mutation_reader_merger::fast_forward_to(const dht::partition_range& pr)
future<> mutation_reader_merger::fast_forward_to(position_range pr) {
prepare_forwardable_readers();
return parallel_for_each(_next, [this, pr = std::move(pr)] (flat_mutation_reader* mr) {
return mr->fast_forward_to(pr);
return parallel_for_each(_next, [this, pr = std::move(pr)] (reader_and_last_fragment_kind rk) {
return rk.reader->fast_forward_to(pr);
});
}

View File

@@ -271,6 +271,16 @@ public:
}
};
struct reader_and_last_fragment_kind {
flat_mutation_reader* reader;
mutation_fragment::kind last_kind;
reader_and_last_fragment_kind(flat_mutation_reader* r, mutation_fragment::kind k)
: reader(r)
, last_kind(k) {
}
};
using mutation_fragment_batch = boost::iterator_range<std::vector<mutation_fragment>::iterator>;
private:
struct reader_heap_compare;
@@ -287,9 +297,9 @@ private:
// Readers and their current fragments, belonging to the current
// partition.
std::vector<reader_and_fragment> _fragment_heap;
std::vector<flat_mutation_reader*> _next;
std::vector<reader_and_last_fragment_kind> _next;
// Readers that reached EOS.
std::vector<flat_mutation_reader*> _halted_readers;
std::vector<reader_and_last_fragment_kind> _halted_readers;
std::vector<mutation_fragment> _current;
dht::decorated_key_opt _key;
const schema_ptr _schema;