read_context: create a copy of autoupdating_underlying_reader

called autoupdating_underlying_flat_reader. It will be modified
in the next patch to use flat reader to underlying.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Piotr Jastrzebski
2017-12-04 16:35:43 +01:00
parent bf4e1c0c54
commit 77b6f7c599
2 changed files with 93 additions and 0 deletions

View File

@@ -30,6 +30,96 @@
namespace cache {
/*
* Represent a flat reader to the underlying source.
* This reader automatically makes sure that it's up to date with all cache updates
*/
class autoupdating_underlying_flat_reader final {
row_cache& _cache;
read_context& _read_context;
stdx::optional<mutation_reader> _reader;
utils::phased_barrier::phase_type _reader_creation_phase;
dht::partition_range _range = { };
stdx::optional<dht::decorated_key> _last_key;
stdx::optional<dht::decorated_key> _new_last_key;
public:
autoupdating_underlying_flat_reader(row_cache& cache, read_context& context)
: _cache(cache)
, _read_context(context)
{ }
// Reads next partition without changing mutation source snapshot.
future<streamed_mutation_opt> read_next_same_phase() {
_last_key = std::move(_new_last_key);
return (*_reader)().then([this] (auto&& smopt) {
if (smopt) {
_new_last_key = smopt->decorated_key();
}
return std::move(smopt);
});
}
future<streamed_mutation_opt> operator()() {
_last_key = std::move(_new_last_key);
auto start = population_range_start();
auto phase = _cache.phase_of(start);
if (!_reader || _reader_creation_phase != phase) {
if (_last_key) {
auto cmp = dht::ring_position_comparator(*_cache._schema);
auto&& new_range = _range.split_after(*_last_key, cmp);
if (!new_range) {
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
}
_range = std::move(*new_range);
_last_key = {};
}
if (_reader) {
++_cache._tracker._stats.underlying_recreations;
}
auto& snap = _cache.snapshot_for_phase(phase);
_reader = {}; // See issue #2644
_reader = _cache.create_underlying_reader(_read_context, snap, _range);
_reader_creation_phase = phase;
}
return (*_reader)().then([this] (auto&& smopt) {
if (smopt) {
_new_last_key = smopt->decorated_key();
}
return std::move(smopt);
});
}
future<> fast_forward_to(dht::partition_range&& range) {
auto snapshot_and_phase = _cache.snapshot_of(dht::ring_position_view::for_range_start(_range));
return fast_forward_to(std::move(range), snapshot_and_phase.snapshot, snapshot_and_phase.phase);
}
future<> fast_forward_to(dht::partition_range&& range, mutation_source& snapshot, row_cache::phase_type phase) {
_range = std::move(range);
_last_key = { };
_new_last_key = { };
if (_reader) {
if (_reader_creation_phase == phase) {
++_cache._tracker._stats.underlying_partition_skips;
return _reader->fast_forward_to(_range);
} else {
++_cache._tracker._stats.underlying_recreations;
_reader = {}; // See issue #2644
}
}
_reader = _cache.create_underlying_reader(_read_context, snapshot, _range);
_reader_creation_phase = phase;
return make_ready_future<>();
}
utils::phased_barrier::phase_type creation_phase() const {
assert(_reader);
return _reader_creation_phase;
}
const dht::partition_range& range() const {
return _range;
}
dht::ring_position_view population_range_start() const {
return _last_key ? dht::ring_position_view::for_after_key(*_last_key)
: dht::ring_position_view::for_range_start(_range);
}
};
/*
* Represent a reader to the underlying source.
* This reader automatically makes sure that it's up to date with all cache updates

View File

@@ -45,6 +45,7 @@ class memtable_entry;
namespace cache {
class autoupdating_underlying_flat_reader;
class autoupdating_underlying_reader;
class cache_streamed_mutation;
class read_context;
@@ -190,6 +191,7 @@ public:
public:
friend class row_cache;
friend class cache::read_context;
friend class cache::autoupdating_underlying_flat_reader;
friend class cache::autoupdating_underlying_reader;
friend class cache::cache_streamed_mutation;
struct stats {
@@ -269,6 +271,7 @@ public:
bi::member_hook<cache_entry, cache_entry::cache_link_type, &cache_entry::_cache_link>,
bi::constant_time_size<false>, // we need this to have bi::auto_unlink on hooks
bi::compare<cache_entry::compare>>;
friend class cache::autoupdating_underlying_flat_reader;
friend class cache::autoupdating_underlying_reader;
friend class single_partition_populating_reader;
friend class cache_entry;