From fdfd1af6941aa55aae68f95ce04b0141caaf4b43 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 28 Jul 2016 12:26:39 +0200 Subject: [PATCH] Use continuity flag correctly with concurrent invalidations Between reading cache entry and actually using it invalidations can happen so we have to check if no flag was cleared if it was we need to read the entry again. Fixes #1464. Signed-off-by: Piotr Jastrzebski Message-Id: <7856b0ded45e42774ccd6f402b5ee42175bd73cf.1469701026.git.piotr@scylladb.com> --- row_cache.cc | 53 +++++++++++++++++++++++++++++++++------- row_cache.hh | 6 +++++ tests/row_cache_test.cc | 54 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 9 deletions(-) diff --git a/row_cache.cc b/row_cache.cc index eb2ca182e8..02e57fdcc3 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -75,7 +75,7 @@ cache_tracker::cache_tracker() { cache_entry& ce = _lru.back(); auto it = row_cache::partitions_type::s_iterator_to(ce); --it; - it->set_continuous(false); + clear_continuity(*it); _lru.pop_back_and_dispose(current_deleter()); --_partitions; ++_evictions; @@ -163,7 +163,7 @@ void cache_tracker::clear() { _lru.erase(_lru.iterator_to(to_remove)); current_deleter()(&to_remove); } - it->set_continuous(false); + clear_continuity(*it); } }); _removals += _partitions; @@ -205,6 +205,10 @@ void cache_tracker::on_uncached_wide_partition() { ++_uncached_wide_partitions; } +void cache_tracker::on_continuity_flag_cleared() { + ++_continuity_flags_cleared; +} + allocation_strategy& cache_tracker::allocator() { return _region.allocator(); } @@ -269,6 +273,11 @@ public: } }; +void cache_tracker::clear_continuity(cache_entry& ce) { + ce.set_continuous(false); + on_continuity_flag_cleared(); +} + void row_cache::on_hit() { _stats.hits.mark(); _tracker.on_hit(); @@ -335,6 +344,7 @@ private: public: struct cache_data { streamed_mutation_opt mut; + uint64_t continuity_flags_cleared; bool continuous; }; just_cache_scanning_reader(schema_ptr s, row_cache& cache, const query::partition_range& range, query::clustering_key_filtering_context ck_filtering) @@ -351,7 +361,7 @@ public: ++_it; _last = ce.key(); _cache.upgrade_entry(ce); - cache_data data{std::move(ce.read(_cache, _schema, _ck_filtering)), ce.continuous()}; + cache_data data{std::move(ce.read(_cache, _schema, _ck_filtering)), _cache._tracker.continuity_flags_cleared(), ce.continuous()}; return make_ready_future(std::move(data)); }); }); @@ -520,6 +530,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ just_cache_scanning_reader _primary; last_key _last_key_from_primary; utils::phased_barrier::phase_type _last_key_from_primary_populate_phase; + uint64_t _last_key_from_primary_continuity_flags_cleared; query::clustering_key_filtering_context _ck_filtering; boost::variantkey(), true}; _last_key_from_primary_populate_phase = _cache._populate_phaser.phase(); + _last_key_from_primary_continuity_flags_cleared = _cache._tracker.continuity_flags_cleared(); return _cache._partitions.begin()->continuous(); } const range_bound& bound = bound_opt.value(); @@ -547,6 +559,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ (!bound.is_inclusive() || bound.value().relation_to_keys() == -1)) { _last_key_from_primary = {i->key(), true}; _last_key_from_primary_populate_phase = _cache._populate_phaser.phase(); + _last_key_from_primary_continuity_flags_cleared = _cache._tracker.continuity_flags_cleared(); return i->continuous(); } --i; @@ -578,6 +591,9 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ // We have to capture mutation from data before we change the state because data lives in state // and changing state destroys previous state. streamed_mutation_opt result = std::move(data.mut); + if (_cache._tracker.continuity_flags_cleared() != data.continuity_flags_cleared) { + data.continuous = _cache.has_continuous_entry(*_last_key_from_primary.value); + } if (data.continuous) { _state = after_continuous_entry_state{}; } else { @@ -646,6 +662,11 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ }); } future operator()(after_continuous_entry_state& state) { + if (_last_key_from_primary_continuity_flags_cleared != _cache._tracker.continuity_flags_cleared() + && !_cache.has_continuous_entry(*_last_key_from_primary.value)) { + _state = after_not_continuous_entry_state{}; + return operator()(); + } return _primary().then([this] (just_cache_scanning_reader::cache_data&& data) { if (!data.mut) { switch_to_end(); @@ -774,7 +795,7 @@ void row_cache::clear_now() noexcept { deleter(p); }); } - _partitions.begin()->set_continuous(false); + _tracker.clear_continuity(*_partitions.begin()); }); } @@ -862,7 +883,7 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec _partitions.insert(cache_i, *entry); } else { --cache_i; - cache_i->set_continuous(false); + _tracker.clear_continuity(*cache_i); } i = m.partitions.erase(i); current_allocator().destroy(&mem_e); @@ -898,10 +919,10 @@ void row_cache::touch(const dht::decorated_key& dk) { void row_cache::invalidate_locked(const dht::decorated_key& dk) { auto pos = _partitions.lower_bound(dk, cache_entry::compare(_schema)); if (pos == _partitions.end()) { - _partitions.rbegin()->set_continuous(false); + _tracker.clear_continuity(*_partitions.rbegin()); } else if (!pos->key().equal(*_schema, dk)) { --pos; - pos->set_continuous(false); + _tracker.clear_continuity(*pos); } else { auto end = pos; ++end; @@ -912,7 +933,7 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) { }); assert (it != _partitions.begin()); --it; - it->set_continuous(false); + _tracker.clear_continuity(*it); } } @@ -973,7 +994,21 @@ void row_cache::invalidate_unwrapped(const query::partition_range& range) { }); assert(it != _partitions.begin()); --it; - it->set_continuous(false); + _tracker.clear_continuity(*it); + }); +} + +bool row_cache::has_continuous_entry(const dht::ring_position& key) const { + return with_linearized_managed_bytes([&] { + auto i = _partitions.lower_bound(key, cache_entry::compare(_schema)); + if (i == _partitions.end()) { + return _partitions.rbegin()->continuous(); + } + if (!i->key().equal(*_schema, key)) { + --i; + return i->continuous(); + } + return i->continuous(); }); } diff --git a/row_cache.hh b/row_cache.hh index 2962686ec2..cf02ee3727 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -156,6 +156,7 @@ private: uint64_t _removals = 0; uint64_t _partitions = 0; uint64_t _modification_count = 0; + uint64_t _continuity_flags_cleared = 0; std::unique_ptr _collectd_registrations; logalloc::region _region; lru_type _lru; @@ -167,17 +168,20 @@ public: void clear(); void touch(cache_entry&); void insert(cache_entry&); + void clear_continuity(cache_entry& ce); void on_erase(); void on_merge(); void on_hit(); void on_miss(); void on_uncached_wide_partition(); + void on_continuity_flag_cleared(); allocation_strategy& allocator(); logalloc::region& region(); const logalloc::region& region() const; uint64_t modification_count() const { return _modification_count; } uint64_t partitions() const { return _partitions; } uint64_t uncached_wide_partitions() const { return _uncached_wide_partitions; } + uint64_t continuity_flags_cleared() const { return _continuity_flags_cleared; } }; // Returns a reference to shard-wide cache_tracker. @@ -294,6 +298,8 @@ public: // The range must be kept alive until method resolves. future<> invalidate(const query::partition_range&); + bool has_continuous_entry(const dht::ring_position& key) const; + auto num_entries() const { return _partitions.size(); } diff --git a/tests/row_cache_test.cc b/tests/row_cache_test.cc index 9e2bfd61be..5ef63d48ad 100644 --- a/tests/row_cache_test.cc +++ b/tests/row_cache_test.cc @@ -896,6 +896,60 @@ static key_source make_key_source(schema_ptr s, std::vector mt = make_lw_shared(s); + + cache_tracker tracker; + row_cache cache(s, mt->as_data_source(), mt->as_key_source(), tracker); + + auto ring = make_ring(s, 4); + for (auto&& m : ring) { + mt->apply(m); + } + + // Bring ring[2]and ring[3] to cache. + assert_that(cache.make_reader(s, query::partition_range::make_starting_with({ ring[2].ring_position(), true }))) + .produces(ring[2]) + .produces(ring[3]) + .produces_end_of_stream(); + + // Start reader with full range. + auto rd = assert_that(cache.make_reader(s, query::full_partition_range)); + rd.produces(ring[0]); + + // Invalidate ring[2] and ring[3] + cache.invalidate(query::partition_range::make_starting_with({ ring[2].ring_position(), true })).get(); + + // Continue previous reader. + rd.produces(ring[1]) + .produces(ring[2]) + .produces(ring[3]) + .produces_end_of_stream(); + + // Start another reader with full range. + rd = assert_that(cache.make_reader(s, query::full_partition_range)); + rd.produces(ring[0]) + .produces(ring[1]) + .produces(ring[2]); + + // Invalidate whole cache. + cache.clear().get(); + + rd.produces(ring[3]) + .produces_end_of_stream(); + + // Start yet another reader with full range. + assert_that(cache.make_reader(s, query::full_partition_range)) + .produces(ring[0]) + .produces(ring[1]) + .produces(ring[2]) + .produces(ring[3]) + .produces_end_of_stream();; + }); +} + SEASTAR_TEST_CASE(test_cache_population_and_update_race) { return seastar::async([] { auto s = make_schema();