From fa2c26342ca53ca595238cd210aa433cd2d8b7e6 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 12 Sep 2017 19:03:51 +0200 Subject: [PATCH] row_cache: Handle eviction in partition reader --- cache_streamed_mutation.hh | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/cache_streamed_mutation.hh b/cache_streamed_mutation.hh index ef393dc7ad..a6b9b1def6 100644 --- a/cache_streamed_mutation.hh +++ b/cache_streamed_mutation.hh @@ -205,10 +205,15 @@ future<> cache_streamed_mutation::do_fill_buffer() { return read_from_underlying(); } return _lsa_manager.run_in_read_section([this] { - auto same_pos = _next_row.maybe_refresh(); - // FIXME: If continuity changed anywhere between _lower_bound and _next_row.position() - // we need to redo the lookup with _lower_bound. There is no eviction yet, so not yet a problem. - assert(same_pos); + // We assume that if there was eviction, and thus the range may + // no longer be continuous, the cursor was invalidated. + if (!_next_row.up_to_date()) { + auto adjacent = _next_row.advance_to(_lower_bound); + _next_row_in_range = !after_current_range(_next_row.position()); + if (!adjacent && !_next_row.continuous()) { + return start_reading_from_underlying(); + } + } while (!is_buffer_full() && !_end_of_stream && !_reading_underlying) { future<> f = copy_from_cache_to_buffer(); if (!f.available() || need_preempt()) { @@ -232,7 +237,14 @@ future<> cache_streamed_mutation::read_from_underlying() { _reading_underlying = false; return _lsa_manager.run_in_update_section([this] { auto same_pos = _next_row.maybe_refresh(); - assert(same_pos); // FIXME: handle eviction + if (!same_pos) { + _read_context->cache().on_mispopulate(); // FIXME: Insert dummy entry at _upper_bound. + _next_row_in_range = !after_current_range(_next_row.position()); + if (!_next_row.continuous()) { + return start_reading_from_underlying(); + } + return make_ready_future<>(); + } if (_next_row_in_range) { maybe_update_continuity(); add_to_buffer(_next_row);