diff --git a/read_context.hh b/read_context.hh index 3ec1617829..271df73d68 100644 --- a/read_context.hh +++ b/read_context.hh @@ -47,6 +47,16 @@ public: : _cache(cache) , _read_context(context) { } + // Reads next partition without changing mutation source snapshot. + future read_next_same_phase() { + _last_key = std::move(_new_last_key); + return (*_reader)().then([this] (auto&& smopt) { + if (smopt) { + _new_last_key = smopt->decorated_key(); + } + return std::move(smopt); + }); + } future operator()() { _last_key = std::move(_new_last_key); auto start = population_range_start(); @@ -73,14 +83,17 @@ public: }); } future<> fast_forward_to(dht::partition_range&& range) { + auto snapshot_and_phase = _cache.snapshot_of(dht::ring_position_view::for_range_start(_range)); + return fast_forward_to(std::move(range), snapshot_and_phase.snapshot, snapshot_and_phase.phase); + } + future<> fast_forward_to(dht::partition_range&& range, mutation_source& snapshot, row_cache::phase_type phase) { _range = std::move(range); _last_key = { }; _new_last_key = { }; - auto phase = _cache.phase_of(dht::ring_position_view::for_range_start(_range)); if (_reader && _reader_creation_phase == phase) { return _reader->fast_forward_to(_range); } - _reader = _cache.create_underlying_reader(_read_context, _cache.snapshot_for_phase(phase), _range); + _reader = _cache.create_underlying_reader(_read_context, snapshot, _range); _reader_creation_phase = phase; return make_ready_future<>(); }