sstables: mutation_reader: Use index to skip across clustering restrictions

Improves scans with clustering restrictions. Before the change such
scans would scan whole partition.

Below are results of a test case from perf_fast_forward which selects
few rows from a large partition using query restrictions (not fast forwarding).

Before:

  stride  rows      time [s]     frags     frag/s    aio      [KiB] blocked dropped  idx hit idx miss  idx blk    cpu
  1000000 1         0.000609         1       1642      3        152       2       1        0        1        1  38.0%
  500000  2         0.242255         2          8    511      64152     398       4        0        1        1  98.6%
  250000  4         0.281592         4         14    749      95832     564       4        0        1        1  98.4%
  125000  8         0.328056         8         24    873     111704     657       4        0        1        1  98.4%
  62500   16        0.306700        16         52    935     119640     751       4        0        1        1  99.4%

After:

  stride  rows      time [s]     frags     frag/s    aio      [KiB] blocked dropped  idx hit idx miss  idx blk    cpu
  1000000 1         0.000711         1       1406      3        152       2       1        0        1        1  42.1%
  500000  2         0.000910         2       2197      5        216       3       2        0        1        1  39.2%
  250000  4         0.001384         4       2891      9        344       5       4        0        1        1  35.3%
  125000  8         0.003197         8       2502     21        728      13       8        0        1        1  53.1%
  62500   16        0.006664        16       2401     41       1368      25      16        0        1        1  58.2%
This commit is contained in:
Tomasz Grabiec
2017-04-07 14:58:57 +02:00
parent 05a1f92cbc
commit 77d3e30239

View File

@@ -123,6 +123,9 @@ private:
// When set, the fragment pending in _in_progress should not be emitted.
bool _skip_in_progress = false;
// The value of _ck_ranges->lower_bound_counter() last time we tried to skip to _ck_ranges->lower_bound().
size_t _last_lower_bound_counter = 0;
// We don't have "end of clustering row" markers. So we know that the current
// row has ended once we get something (e.g. a live cell) that belongs to another
// one. If that happens sstable reader is interrupted (proceed::no) but we
@@ -351,6 +354,7 @@ private:
sstlog.trace("mp_row_consumer {}: set_up_ck_ranges({})", this, pk);
_ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk);
_ck_ranges_walker = clustering_ranges_walker(*_schema, _ck_ranges->ranges());
_last_lower_bound_counter = 0;
_fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows();
_out_of_range = false;
_range_tombstones.reset();
@@ -433,10 +437,13 @@ public:
ret = flush();
}
advance_to(rt);
_in_progress = mutation_fragment(std::move(rt));
if (_out_of_range) {
ret = push_ready_fragments_out_of_range();
}
_in_progress = mutation_fragment(std::move(rt));
if (needs_skip()) {
ret = proceed::no;
}
return ret;
}
@@ -458,14 +465,17 @@ public:
}
if (!_in_progress) {
advance_to(pos);
if (_out_of_range) {
ret = push_ready_fragments_out_of_range();
}
if (is_static) {
_in_progress = mutation_fragment(static_row());
} else {
_in_progress = mutation_fragment(clustering_row(std::move(pos.key())));
}
if (_out_of_range) {
ret = push_ready_fragments_out_of_range();
}
if (needs_skip()) {
ret = proceed::no;
}
}
return ret;
}
@@ -786,6 +796,15 @@ public:
// must be after it.
//
future<> fast_forward_to(position_range);
bool needs_skip() const {
return (_skip_in_progress || !_in_progress)
&& _last_lower_bound_counter != _ck_ranges_walker->lower_bound_change_counter();
}
// Tries to fast forward the consuming context to the next position.
// Must be called outside consuming context.
future<> maybe_skip();
};
struct sstable_data_source {
@@ -861,7 +880,9 @@ public:
if (is_buffer_full() || is_end_of_stream()) {
return make_ready_future<>();
}
return _ds->_context.read();
return _ds->_consumer.maybe_skip().then([this] {
return _ds->_context.read();
});
}
future<> fast_forward_to(position_range range) override {
@@ -989,9 +1010,20 @@ future<> mp_row_consumer::fast_forward_to(position_range r) {
}
sstlog.trace("mp_row_consumer {}: advance_context({})", this, start);
_last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter();
return _sm->advance_context(start);
}
future<> mp_row_consumer::maybe_skip() {
if (!needs_skip()) {
return make_ready_future<>();
}
_last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter();
auto pos = _ck_ranges_walker->lower_bound();
sstlog.trace("mp_row_consumer {}: advance_context({})", this, pos);
return _sm->advance_context(pos);
}
static int adjust_binary_search_index(int idx) {
if (idx < 0) {
// binary search gives us the first index _greater_ than the key searched for,