From 22019577ccb80df41026feab9cdf5bdaaa36eefa Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 15 Sep 2017 15:09:10 +0200 Subject: [PATCH] cache_streamed_mutation: Call fast_forward_to() outside allocating section On bad_alloc the section is retried. If the exception happened inside fast_forward_to() on the underlying reader, that call will be retried. However, the reader should not be used after exception is thrown, since it is in unspecified state. Also, calling fast_forward_to() with cache region locked increases the chances of it failing to allocate. We shouldn't call fast_forward_to() with the cache region locked. Fixes #2791. --- cache_streamed_mutation.hh | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/cache_streamed_mutation.hh b/cache_streamed_mutation.hh index 00d1550b89..3e5c6ff36d 100644 --- a/cache_streamed_mutation.hh +++ b/cache_streamed_mutation.hh @@ -78,6 +78,12 @@ class cache_streamed_mutation final : public streamed_mutation::impl { // - _next_row_in_range = _next.position() < _upper_bound reading_from_cache, + // Starts reading from underlying reader. + // The range to read is position_range(_lower_bound, min(_next_row.position(), _upper_bound)). + // Invariants: + // - _next_row_in_range = _next.position() < _upper_bound + move_to_underlying, + // Invariants: // - Upper bound of the read is min(_next_row.position(), _upper_bound) // - _next_row_in_range = _next.position() < _upper_bound @@ -218,6 +224,14 @@ future<> cache_streamed_mutation::fill_buffer() { inline future<> cache_streamed_mutation::do_fill_buffer() { + if (_state == state::move_to_underlying) { + _state = state::reading_from_underlying; + auto end = _next_row_in_range ? position_in_partition(_next_row.position()) + : position_in_partition(_upper_bound); + return _read_context->fast_forward_to(position_range{_lower_bound, std::move(end)}).then([this] { + return read_from_underlying(); + }); + } if (_state == state::reading_from_underlying) { return read_from_underlying(); } @@ -364,10 +378,8 @@ bool cache_streamed_mutation::after_current_range(position_in_partition_view p) inline future<> cache_streamed_mutation::start_reading_from_underlying() { - _state = state::reading_from_underlying; - auto end = _next_row_in_range ? position_in_partition(_next_row.position()) - : position_in_partition(_upper_bound); - return _read_context->fast_forward_to(position_range{_lower_bound, std::move(end)}); + _state = state::move_to_underlying; + return make_ready_future<>(); } inline