diff --git a/row_cache.cc b/row_cache.cc index eb2ca182e8..02e57fdcc3 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -75,7 +75,7 @@ cache_tracker::cache_tracker() { cache_entry& ce = _lru.back(); auto it = row_cache::partitions_type::s_iterator_to(ce); --it; - it->set_continuous(false); + clear_continuity(*it); _lru.pop_back_and_dispose(current_deleter()); --_partitions; ++_evictions; @@ -163,7 +163,7 @@ void cache_tracker::clear() { _lru.erase(_lru.iterator_to(to_remove)); current_deleter()(&to_remove); } - it->set_continuous(false); + clear_continuity(*it); } }); _removals += _partitions; @@ -205,6 +205,10 @@ void cache_tracker::on_uncached_wide_partition() { ++_uncached_wide_partitions; } +void cache_tracker::on_continuity_flag_cleared() { + ++_continuity_flags_cleared; +} + allocation_strategy& cache_tracker::allocator() { return _region.allocator(); } @@ -269,6 +273,11 @@ public: } }; +void cache_tracker::clear_continuity(cache_entry& ce) { + ce.set_continuous(false); + on_continuity_flag_cleared(); +} + void row_cache::on_hit() { _stats.hits.mark(); _tracker.on_hit(); @@ -335,6 +344,7 @@ private: public: struct cache_data { streamed_mutation_opt mut; + uint64_t continuity_flags_cleared; bool continuous; }; just_cache_scanning_reader(schema_ptr s, row_cache& cache, const query::partition_range& range, query::clustering_key_filtering_context ck_filtering) @@ -351,7 +361,7 @@ public: ++_it; _last = ce.key(); _cache.upgrade_entry(ce); - cache_data data{std::move(ce.read(_cache, _schema, _ck_filtering)), ce.continuous()}; + cache_data data{std::move(ce.read(_cache, _schema, _ck_filtering)), _cache._tracker.continuity_flags_cleared(), ce.continuous()}; return make_ready_future(std::move(data)); }); }); @@ -520,6 +530,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ just_cache_scanning_reader _primary; last_key _last_key_from_primary; utils::phased_barrier::phase_type _last_key_from_primary_populate_phase; + uint64_t _last_key_from_primary_continuity_flags_cleared; query::clustering_key_filtering_context _ck_filtering; boost::variantkey(), true}; _last_key_from_primary_populate_phase = _cache._populate_phaser.phase(); + _last_key_from_primary_continuity_flags_cleared = _cache._tracker.continuity_flags_cleared(); return _cache._partitions.begin()->continuous(); } const range_bound& bound = bound_opt.value(); @@ -547,6 +559,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ (!bound.is_inclusive() || bound.value().relation_to_keys() == -1)) { _last_key_from_primary = {i->key(), true}; _last_key_from_primary_populate_phase = _cache._populate_phaser.phase(); + _last_key_from_primary_continuity_flags_cleared = _cache._tracker.continuity_flags_cleared(); return i->continuous(); } --i; @@ -578,6 +591,9 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ // We have to capture mutation from data before we change the state because data lives in state // and changing state destroys previous state. streamed_mutation_opt result = std::move(data.mut); + if (_cache._tracker.continuity_flags_cleared() != data.continuity_flags_cleared) { + data.continuous = _cache.has_continuous_entry(*_last_key_from_primary.value); + } if (data.continuous) { _state = after_continuous_entry_state{}; } else { @@ -646,6 +662,11 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ }); } future operator()(after_continuous_entry_state& state) { + if (_last_key_from_primary_continuity_flags_cleared != _cache._tracker.continuity_flags_cleared() + && !_cache.has_continuous_entry(*_last_key_from_primary.value)) { + _state = after_not_continuous_entry_state{}; + return operator()(); + } return _primary().then([this] (just_cache_scanning_reader::cache_data&& data) { if (!data.mut) { switch_to_end(); @@ -774,7 +795,7 @@ void row_cache::clear_now() noexcept { deleter(p); }); } - _partitions.begin()->set_continuous(false); + _tracker.clear_continuity(*_partitions.begin()); }); } @@ -862,7 +883,7 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec _partitions.insert(cache_i, *entry); } else { --cache_i; - cache_i->set_continuous(false); + _tracker.clear_continuity(*cache_i); } i = m.partitions.erase(i); current_allocator().destroy(&mem_e); @@ -898,10 +919,10 @@ void row_cache::touch(const dht::decorated_key& dk) { void row_cache::invalidate_locked(const dht::decorated_key& dk) { auto pos = _partitions.lower_bound(dk, cache_entry::compare(_schema)); if (pos == _partitions.end()) { - _partitions.rbegin()->set_continuous(false); + _tracker.clear_continuity(*_partitions.rbegin()); } else if (!pos->key().equal(*_schema, dk)) { --pos; - pos->set_continuous(false); + _tracker.clear_continuity(*pos); } else { auto end = pos; ++end; @@ -912,7 +933,7 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) { }); assert (it != _partitions.begin()); --it; - it->set_continuous(false); + _tracker.clear_continuity(*it); } } @@ -973,7 +994,21 @@ void row_cache::invalidate_unwrapped(const query::partition_range& range) { }); assert(it != _partitions.begin()); --it; - it->set_continuous(false); + _tracker.clear_continuity(*it); + }); +} + +bool row_cache::has_continuous_entry(const dht::ring_position& key) const { + return with_linearized_managed_bytes([&] { + auto i = _partitions.lower_bound(key, cache_entry::compare(_schema)); + if (i == _partitions.end()) { + return _partitions.rbegin()->continuous(); + } + if (!i->key().equal(*_schema, key)) { + --i; + return i->continuous(); + } + return i->continuous(); }); } diff --git a/row_cache.hh b/row_cache.hh index 2962686ec2..cf02ee3727 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -156,6 +156,7 @@ private: uint64_t _removals = 0; uint64_t _partitions = 0; uint64_t _modification_count = 0; + uint64_t _continuity_flags_cleared = 0; std::unique_ptr _collectd_registrations; logalloc::region _region; lru_type _lru; @@ -167,17 +168,20 @@ public: void clear(); void touch(cache_entry&); void insert(cache_entry&); + void clear_continuity(cache_entry& ce); void on_erase(); void on_merge(); void on_hit(); void on_miss(); void on_uncached_wide_partition(); + void on_continuity_flag_cleared(); allocation_strategy& allocator(); logalloc::region& region(); const logalloc::region& region() const; uint64_t modification_count() const { return _modification_count; } uint64_t partitions() const { return _partitions; } uint64_t uncached_wide_partitions() const { return _uncached_wide_partitions; } + uint64_t continuity_flags_cleared() const { return _continuity_flags_cleared; } }; // Returns a reference to shard-wide cache_tracker. @@ -294,6 +298,8 @@ public: // The range must be kept alive until method resolves. future<> invalidate(const query::partition_range&); + bool has_continuous_entry(const dht::ring_position& key) const; + auto num_entries() const { return _partitions.size(); } diff --git a/tests/row_cache_test.cc b/tests/row_cache_test.cc index 9e2bfd61be..5ef63d48ad 100644 --- a/tests/row_cache_test.cc +++ b/tests/row_cache_test.cc @@ -896,6 +896,60 @@ static key_source make_key_source(schema_ptr s, std::vector mt = make_lw_shared(s); + + cache_tracker tracker; + row_cache cache(s, mt->as_data_source(), mt->as_key_source(), tracker); + + auto ring = make_ring(s, 4); + for (auto&& m : ring) { + mt->apply(m); + } + + // Bring ring[2]and ring[3] to cache. + assert_that(cache.make_reader(s, query::partition_range::make_starting_with({ ring[2].ring_position(), true }))) + .produces(ring[2]) + .produces(ring[3]) + .produces_end_of_stream(); + + // Start reader with full range. + auto rd = assert_that(cache.make_reader(s, query::full_partition_range)); + rd.produces(ring[0]); + + // Invalidate ring[2] and ring[3] + cache.invalidate(query::partition_range::make_starting_with({ ring[2].ring_position(), true })).get(); + + // Continue previous reader. + rd.produces(ring[1]) + .produces(ring[2]) + .produces(ring[3]) + .produces_end_of_stream(); + + // Start another reader with full range. + rd = assert_that(cache.make_reader(s, query::full_partition_range)); + rd.produces(ring[0]) + .produces(ring[1]) + .produces(ring[2]); + + // Invalidate whole cache. + cache.clear().get(); + + rd.produces(ring[3]) + .produces_end_of_stream(); + + // Start yet another reader with full range. + assert_that(cache.make_reader(s, query::full_partition_range)) + .produces(ring[0]) + .produces(ring[1]) + .produces(ring[2]) + .produces(ring[3]) + .produces_end_of_stream();; + }); +} + SEASTAR_TEST_CASE(test_cache_population_and_update_race) { return seastar::async([] { auto s = make_schema();