diff --git a/read_context.hh b/read_context.hh index 0c7e5c308b..c59422aae1 100644 --- a/read_context.hh +++ b/read_context.hh @@ -30,6 +30,96 @@ namespace cache { +/* +* Represent a flat reader to the underlying source. +* This reader automatically makes sure that it's up to date with all cache updates +*/ +class autoupdating_underlying_flat_reader final { + row_cache& _cache; + read_context& _read_context; + stdx::optional _reader; + utils::phased_barrier::phase_type _reader_creation_phase; + dht::partition_range _range = { }; + stdx::optional _last_key; + stdx::optional _new_last_key; +public: + autoupdating_underlying_flat_reader(row_cache& cache, read_context& context) + : _cache(cache) + , _read_context(context) + { } + // Reads next partition without changing mutation source snapshot. + future read_next_same_phase() { + _last_key = std::move(_new_last_key); + return (*_reader)().then([this] (auto&& smopt) { + if (smopt) { + _new_last_key = smopt->decorated_key(); + } + return std::move(smopt); + }); + } + future operator()() { + _last_key = std::move(_new_last_key); + auto start = population_range_start(); + auto phase = _cache.phase_of(start); + if (!_reader || _reader_creation_phase != phase) { + if (_last_key) { + auto cmp = dht::ring_position_comparator(*_cache._schema); + auto&& new_range = _range.split_after(*_last_key, cmp); + if (!new_range) { + return make_ready_future(streamed_mutation_opt()); + } + _range = std::move(*new_range); + _last_key = {}; + } + if (_reader) { + ++_cache._tracker._stats.underlying_recreations; + } + auto& snap = _cache.snapshot_for_phase(phase); + _reader = {}; // See issue #2644 + _reader = _cache.create_underlying_reader(_read_context, snap, _range); + _reader_creation_phase = phase; + } + return (*_reader)().then([this] (auto&& smopt) { + if (smopt) { + _new_last_key = smopt->decorated_key(); + } + return std::move(smopt); + }); + } + future<> fast_forward_to(dht::partition_range&& range) { + auto snapshot_and_phase = _cache.snapshot_of(dht::ring_position_view::for_range_start(_range)); + return fast_forward_to(std::move(range), snapshot_and_phase.snapshot, snapshot_and_phase.phase); + } + future<> fast_forward_to(dht::partition_range&& range, mutation_source& snapshot, row_cache::phase_type phase) { + _range = std::move(range); + _last_key = { }; + _new_last_key = { }; + if (_reader) { + if (_reader_creation_phase == phase) { + ++_cache._tracker._stats.underlying_partition_skips; + return _reader->fast_forward_to(_range); + } else { + ++_cache._tracker._stats.underlying_recreations; + _reader = {}; // See issue #2644 + } + } + _reader = _cache.create_underlying_reader(_read_context, snapshot, _range); + _reader_creation_phase = phase; + return make_ready_future<>(); + } + utils::phased_barrier::phase_type creation_phase() const { + assert(_reader); + return _reader_creation_phase; + } + const dht::partition_range& range() const { + return _range; + } + dht::ring_position_view population_range_start() const { + return _last_key ? dht::ring_position_view::for_after_key(*_last_key) + : dht::ring_position_view::for_range_start(_range); + } +}; + /* * Represent a reader to the underlying source. * This reader automatically makes sure that it's up to date with all cache updates diff --git a/row_cache.hh b/row_cache.hh index cb37b21677..9b78c1be90 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -45,6 +45,7 @@ class memtable_entry; namespace cache { +class autoupdating_underlying_flat_reader; class autoupdating_underlying_reader; class cache_streamed_mutation; class read_context; @@ -190,6 +191,7 @@ public: public: friend class row_cache; friend class cache::read_context; + friend class cache::autoupdating_underlying_flat_reader; friend class cache::autoupdating_underlying_reader; friend class cache::cache_streamed_mutation; struct stats { @@ -269,6 +271,7 @@ public: bi::member_hook, bi::constant_time_size, // we need this to have bi::auto_unlink on hooks bi::compare>; + friend class cache::autoupdating_underlying_flat_reader; friend class cache::autoupdating_underlying_reader; friend class single_partition_populating_reader; friend class cache_entry;