sstables: mutation_reader: Let clustering_ranges_walker handle the _fwd_range start

Simplifies the code a bit, but also will make it easier to calculate
the next position we should skip to after forwarding, taking into
consideration both the position forwarded to as well as clustering
ranges of the query. That will be just calling
_ck_ranges_walker->lower_bound() after it is trimmed.
This commit is contained in:
Tomasz Grabiec
2017-04-07 10:32:19 +02:00
parent d056d9c31b
commit bfb6858e55

View File

@@ -142,17 +142,15 @@ private:
stdx::optional<new_mutation> _mutation;
bool _is_mutation_end = true;
position_range _fwd_range = position_range::full(); // Restricts the stream on top of _ck_ranges.
position_in_partition _fwd_end = position_in_partition::after_all_clustered_rows(); // Restricts the stream on top of _ck_ranges_walker.
streamed_mutation::forwarding _fwd;
bool _after_fwd_range_start = false;
// Because of #1203 we may encounter sstables with range tombstones
// placed earlier than expected. We fix the ordering by loading range tombstones
// initially into _range_tombstones, until first row is encountered,
// and then merge the two streams in push_ready_fragments().
//
// _range_tombstones holds only tombstones which are inside _ck_ranges and
// after current _fwd_range.start().
// _range_tombstones holds only tombstones which are relevant for current ranges.
range_tombstone_stream _range_tombstones;
bool _first_row_encountered = false;
public:
@@ -313,28 +311,15 @@ private:
void advance_to(position_in_partition_view pos) {
position_in_partition::less_compare less(*_schema);
auto log = [&] {
sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, pos, _out_of_range, _skip_in_progress);
};
if (!_after_fwd_range_start && less(pos, _fwd_range.start())) {
_skip_in_progress = true;
log();
return;
}
_after_fwd_range_start = true;
if (!less(pos, _fwd_range.end())) {
if (!less(pos, _fwd_end)) {
_out_of_range = true;
_skip_in_progress = false;
log();
return;
} else {
_skip_in_progress = !_ck_ranges_walker->advance_to(pos);
_out_of_range |= _ck_ranges_walker->out_of_range();
}
_skip_in_progress = !pos.is_static_row() && !_ck_ranges_walker->advance_to(pos);
_out_of_range |= _ck_ranges_walker->out_of_range();
log();
sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, pos, _out_of_range, _skip_in_progress);
}
// Assumes that this and other advance_to() overloads are called with monotonic positions.
@@ -343,26 +328,15 @@ private:
auto&& start = rt.position();
auto&& end = rt.end_position();
auto log = [&] {
sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, rt, _out_of_range, _skip_in_progress);
};
if (less(end, _fwd_range.start())) {
_skip_in_progress = true;
log();
return;
}
if (!less(start, _fwd_range.end())) {
if (!less(start, _fwd_end)) {
_out_of_range = true;
_skip_in_progress = false; // It may become in range after next forwarding, so cannot drop it
log();
return;
} else {
_skip_in_progress = !_ck_ranges_walker->advance_to(start, end);
_out_of_range |= _ck_ranges_walker->out_of_range();
}
_skip_in_progress = !_ck_ranges_walker->advance_to(start, end);
_out_of_range |= _ck_ranges_walker->out_of_range();
log();
sstlog.trace("mp_row_consumer {}: advance_to({}) => out_of_range={}, skip_in_progress={}", this, rt, _out_of_range, _skip_in_progress);
}
void advance_to(const mutation_fragment& mf) {
@@ -377,9 +351,8 @@ private:
sstlog.trace("mp_row_consumer {}: set_up_ck_ranges({})", this, pk);
_ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk);
_ck_ranges_walker = clustering_ranges_walker(*_schema, _ck_ranges->ranges());
_fwd_range = _fwd ? position_range::for_static_row() : position_range::full();
_fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows();
_out_of_range = false;
_after_fwd_range_start = false;
_range_tombstones.reset();
_first_row_encountered = false;
}
@@ -736,7 +709,7 @@ public:
}
// Workaround for #1203
if (!_first_row_encountered) {
if (!less(rt_pos, _fwd_range.start()) && _ck_ranges_walker->advance_to(rt_pos, rt.end_position())) {
if (_ck_ranges_walker->advance_to(rt_pos, rt.end_position())) {
_range_tombstones.apply(std::move(rt));
}
return proceed::yes;
@@ -951,7 +924,7 @@ row_consumer::proceed
mp_row_consumer::push_ready_fragments_out_of_range() {
// Emit all range tombstones relevant to the current forwarding range first.
while (!_sm->is_buffer_full()) {
auto mfo = _range_tombstones.get_next(_fwd_range.end());
auto mfo = _range_tombstones.get_next(_fwd_end);
if (!mfo) {
_sm->_end_of_stream = true;
break;
@@ -975,13 +948,23 @@ mp_row_consumer::push_ready_fragments() {
}
future<> mp_row_consumer::fast_forward_to(position_range r) {
_fwd_range = std::move(r);
sstlog.trace("mp_row_consumer {}: fast_forward_to({})", this, r);
_out_of_range = _is_mutation_end;
_after_fwd_range_start = false;
_fwd_end = std::move(r).end();
_range_tombstones.forward_to(_fwd_range.start());
_range_tombstones.forward_to(r.start());
if (_ready && !_ready->relevant_for_range(*_schema, _fwd_range.start())) {
_ck_ranges_walker->trim_front(std::move(r).start());
if (_ck_ranges_walker->out_of_range()) {
_out_of_range = true;
_ready = {};
sstlog.trace("mp_row_consumer {}: no more ranges", this);
return make_ready_future<>();
}
auto start = _ck_ranges_walker->lower_bound();
if (_ready && !_ready->relevant_for_range(*_schema, start)) {
_ready = {};
}
@@ -989,11 +972,11 @@ future<> mp_row_consumer::fast_forward_to(position_range r) {
advance_to(*_in_progress);
}
sstlog.trace("mp_row_consumer {}: fast_forward_to({}) => out_of_range={}, skip_in_progress={}", this, _fwd_range, _out_of_range, _skip_in_progress);
sstlog.trace("mp_row_consumer {}: => out_of_range={}, skip_in_progress={}", this, _out_of_range, _skip_in_progress);
if (!_in_progress || _skip_in_progress) {
sstlog.trace("mp_row_consumer {}: advance_context({})", this, _fwd_range.start());
return _sm->advance_context(_fwd_range.start());
sstlog.trace("mp_row_consumer {}: advance_context({})", this, start);
return _sm->advance_context(start);
}
return make_ready_future<>();