From 77d3e30239a5443df9dddb5e3d0deee7771aae6f Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 7 Apr 2017 14:58:57 +0200 Subject: [PATCH] sstables: mutation_reader: Use index to skip across clustering restrictions Improves scans with clustering restrictions. Before the change such scans would scan whole partition. Below are results of a test case from perf_fast_forward which selects few rows from a large partition using query restrictions (not fast forwarding). Before: stride rows time [s] frags frag/s aio [KiB] blocked dropped idx hit idx miss idx blk cpu 1000000 1 0.000609 1 1642 3 152 2 1 0 1 1 38.0% 500000 2 0.242255 2 8 511 64152 398 4 0 1 1 98.6% 250000 4 0.281592 4 14 749 95832 564 4 0 1 1 98.4% 125000 8 0.328056 8 24 873 111704 657 4 0 1 1 98.4% 62500 16 0.306700 16 52 935 119640 751 4 0 1 1 99.4% After: stride rows time [s] frags frag/s aio [KiB] blocked dropped idx hit idx miss idx blk cpu 1000000 1 0.000711 1 1406 3 152 2 1 0 1 1 42.1% 500000 2 0.000910 2 2197 5 216 3 2 0 1 1 39.2% 250000 4 0.001384 4 2891 9 344 5 4 0 1 1 35.3% 125000 8 0.003197 8 2502 21 728 13 8 0 1 1 53.1% 62500 16 0.006664 16 2401 41 1368 25 16 0 1 1 58.2% --- sstables/partition.cc | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/sstables/partition.cc b/sstables/partition.cc index 240b8b157b..94ebc961b3 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -123,6 +123,9 @@ private: // When set, the fragment pending in _in_progress should not be emitted. bool _skip_in_progress = false; + // The value of _ck_ranges->lower_bound_counter() last time we tried to skip to _ck_ranges->lower_bound(). + size_t _last_lower_bound_counter = 0; + // We don't have "end of clustering row" markers. So we know that the current // row has ended once we get something (e.g. a live cell) that belongs to another // one. If that happens sstable reader is interrupted (proceed::no) but we @@ -351,6 +354,7 @@ private: sstlog.trace("mp_row_consumer {}: set_up_ck_ranges({})", this, pk); _ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk); _ck_ranges_walker = clustering_ranges_walker(*_schema, _ck_ranges->ranges()); + _last_lower_bound_counter = 0; _fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows(); _out_of_range = false; _range_tombstones.reset(); @@ -433,10 +437,13 @@ public: ret = flush(); } advance_to(rt); + _in_progress = mutation_fragment(std::move(rt)); if (_out_of_range) { ret = push_ready_fragments_out_of_range(); } - _in_progress = mutation_fragment(std::move(rt)); + if (needs_skip()) { + ret = proceed::no; + } return ret; } @@ -458,14 +465,17 @@ public: } if (!_in_progress) { advance_to(pos); - if (_out_of_range) { - ret = push_ready_fragments_out_of_range(); - } if (is_static) { _in_progress = mutation_fragment(static_row()); } else { _in_progress = mutation_fragment(clustering_row(std::move(pos.key()))); } + if (_out_of_range) { + ret = push_ready_fragments_out_of_range(); + } + if (needs_skip()) { + ret = proceed::no; + } } return ret; } @@ -786,6 +796,15 @@ public: // must be after it. // future<> fast_forward_to(position_range); + + bool needs_skip() const { + return (_skip_in_progress || !_in_progress) + && _last_lower_bound_counter != _ck_ranges_walker->lower_bound_change_counter(); + } + + // Tries to fast forward the consuming context to the next position. + // Must be called outside consuming context. + future<> maybe_skip(); }; struct sstable_data_source { @@ -861,7 +880,9 @@ public: if (is_buffer_full() || is_end_of_stream()) { return make_ready_future<>(); } - return _ds->_context.read(); + return _ds->_consumer.maybe_skip().then([this] { + return _ds->_context.read(); + }); } future<> fast_forward_to(position_range range) override { @@ -989,9 +1010,20 @@ future<> mp_row_consumer::fast_forward_to(position_range r) { } sstlog.trace("mp_row_consumer {}: advance_context({})", this, start); + _last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter(); return _sm->advance_context(start); } +future<> mp_row_consumer::maybe_skip() { + if (!needs_skip()) { + return make_ready_future<>(); + } + _last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter(); + auto pos = _ck_ranges_walker->lower_bound(); + sstlog.trace("mp_row_consumer {}: advance_context({})", this, pos); + return _sm->advance_context(pos); +} + static int adjust_binary_search_index(int idx) { if (idx < 0) { // binary search gives us the first index _greater_ than the key searched for,