diff --git a/mutation_reader.cc b/mutation_reader.cc index 3a27414adb..5751361ed7 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1472,18 +1472,18 @@ future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::t _end_of_stream = false; if (_reader) { - return _reader->fast_forward_to(pr, timeout); + co_await _reader->fast_forward_to(pr, timeout); + _range_override.reset(); + co_return; } if (!_reader_created || !_irh) { - return make_ready_future<>(); + co_return; } if (auto reader_opt = try_resume()) { - auto f = reader_opt->fast_forward_to(pr, timeout); - return f.then([this, reader = std::move(*reader_opt)] () mutable { - maybe_pause(std::move(reader)); - }); + co_await reader_opt->fast_forward_to(pr, timeout); + _range_override.reset(); + maybe_pause(std::move(*reader_opt)); } - return make_ready_future<>(); } evictable_reader_handle::evictable_reader_handle(evictable_reader& r) : _r(&r) diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index eb8e9004ff..2fa8d894eb 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -3570,6 +3570,113 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) { max_buffer_size); } +SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) { + class test_reader : public flat_mutation_reader::impl { + simple_schema _s; + const std::vector _pkeys; + std::vector::const_iterator _it; + std::vector::const_iterator _end; + private: + void on_range_change(const dht::partition_range& pr) { + dht::ring_position_comparator cmp(*_schema); + _it = _pkeys.begin(); + while (_it != _pkeys.end() && !pr.contains(*_it, cmp)) { + ++_it; + } + _end = _it; + while (_end != _pkeys.end() && pr.contains(*_end, cmp)) { + ++_end; + } + } + public: + test_reader(simple_schema s, reader_permit permit, const dht::partition_range& pr, std::vector pkeys) + : impl(s.schema(), std::move(permit)) + , _s(std::move(s)) + , _pkeys(std::move(pkeys)) { + on_range_change(pr); + } + + virtual future<> fill_buffer(db::timeout_clock::time_point) override { + if (_it == _end) { + _end_of_stream = true; + return make_ready_future<>(); + } + + push_mutation_fragment(*_schema, _permit, partition_start(*_it++, {})); + + uint32_t ck = 0; + while (!is_buffer_full()) { + auto ckey = _s.make_ckey(ck); + push_mutation_fragment(*_schema, _permit, _s.make_row(_s.make_ckey(ck++), make_random_string(1024))); + ++ck; + } + + push_mutation_fragment(*_schema, _permit, partition_end()); + return make_ready_future<>(); + } + virtual future<> next_partition() override { + return make_ready_future<>(); + } + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point) override { + on_range_change(pr); + clear_buffer(); + _end_of_stream = false; + return make_ready_future<>(); + } + virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override { + return make_exception_future<>(make_backtraced_exception_ptr()); + } + }; + + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + simple_schema s; + auto permit = semaphore.make_permit(s.schema().get(), get_name()); + auto pkeys = s.make_pkeys(6); + boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); + + auto ms = mutation_source([&] (schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr tr, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + std::vector pkeys_with_data; + bool empty = false; + for (const auto& pkey : pkeys) { + empty = !empty; + if (empty) { + pkeys_with_data.push_back(pkey); + } + } + return make_flat_mutation_reader( + s, + std::move(permit), + range, + std::move(pkeys_with_data)); + }); + + auto pr0 = dht::partition_range::make({pkeys[0], true}, {pkeys[3], true}); + auto [reader, handle] = make_manually_paused_evictable_reader(std::move(ms), s.schema(), permit, pr0, s.schema()->full_slice(), + seastar::default_priority_class(), {}, mutation_reader::forwarding::yes); + + auto reader_assert = assert_that(std::move(reader)); + reader_assert.produces(pkeys[0]); + reader_assert.produces(pkeys[2]); + + handle.pause(); + BOOST_REQUIRE(semaphore.try_evict_one_inactive_read()); + + reader_assert.produces_end_of_stream(); + + auto pr1 = dht::partition_range::make({pkeys[4], true}, {pkeys[5], true}); + reader_assert.fast_forward_to(pr1); + + // Failure will happen in the form of `on_internal_error()`. + reader_assert.produces(pkeys[4]); +} + struct mutation_bounds { mutation m; position_in_partition lower;