mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-28 10:41:12 +00:00
Use continuity flag correctly with concurrent invalidations
Between reading cache entry and actually using it invalidations can happen so we have to check if no flag was cleared if it was we need to read the entry again. Fixes #1464. Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com> Message-Id: <7856b0ded45e42774ccd6f402b5ee42175bd73cf.1469701026.git.piotr@scylladb.com>
This commit is contained in:
committed by
Paweł Dziepak
parent
25a44ee6cf
commit
fdfd1af694
53
row_cache.cc
53
row_cache.cc
@@ -75,7 +75,7 @@ cache_tracker::cache_tracker() {
|
||||
cache_entry& ce = _lru.back();
|
||||
auto it = row_cache::partitions_type::s_iterator_to(ce);
|
||||
--it;
|
||||
it->set_continuous(false);
|
||||
clear_continuity(*it);
|
||||
_lru.pop_back_and_dispose(current_deleter<cache_entry>());
|
||||
--_partitions;
|
||||
++_evictions;
|
||||
@@ -163,7 +163,7 @@ void cache_tracker::clear() {
|
||||
_lru.erase(_lru.iterator_to(to_remove));
|
||||
current_deleter<cache_entry>()(&to_remove);
|
||||
}
|
||||
it->set_continuous(false);
|
||||
clear_continuity(*it);
|
||||
}
|
||||
});
|
||||
_removals += _partitions;
|
||||
@@ -205,6 +205,10 @@ void cache_tracker::on_uncached_wide_partition() {
|
||||
++_uncached_wide_partitions;
|
||||
}
|
||||
|
||||
void cache_tracker::on_continuity_flag_cleared() {
|
||||
++_continuity_flags_cleared;
|
||||
}
|
||||
|
||||
allocation_strategy& cache_tracker::allocator() {
|
||||
return _region.allocator();
|
||||
}
|
||||
@@ -269,6 +273,11 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
void cache_tracker::clear_continuity(cache_entry& ce) {
|
||||
ce.set_continuous(false);
|
||||
on_continuity_flag_cleared();
|
||||
}
|
||||
|
||||
void row_cache::on_hit() {
|
||||
_stats.hits.mark();
|
||||
_tracker.on_hit();
|
||||
@@ -335,6 +344,7 @@ private:
|
||||
public:
|
||||
struct cache_data {
|
||||
streamed_mutation_opt mut;
|
||||
uint64_t continuity_flags_cleared;
|
||||
bool continuous;
|
||||
};
|
||||
just_cache_scanning_reader(schema_ptr s, row_cache& cache, const query::partition_range& range, query::clustering_key_filtering_context ck_filtering)
|
||||
@@ -351,7 +361,7 @@ public:
|
||||
++_it;
|
||||
_last = ce.key();
|
||||
_cache.upgrade_entry(ce);
|
||||
cache_data data{std::move(ce.read(_cache, _schema, _ck_filtering)), ce.continuous()};
|
||||
cache_data data{std::move(ce.read(_cache, _schema, _ck_filtering)), _cache._tracker.continuity_flags_cleared(), ce.continuous()};
|
||||
return make_ready_future<cache_data>(std::move(data));
|
||||
});
|
||||
});
|
||||
@@ -520,6 +530,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl{
|
||||
just_cache_scanning_reader _primary;
|
||||
last_key _last_key_from_primary;
|
||||
utils::phased_barrier::phase_type _last_key_from_primary_populate_phase;
|
||||
uint64_t _last_key_from_primary_continuity_flags_cleared;
|
||||
query::clustering_key_filtering_context _ck_filtering;
|
||||
boost::variant<end_state,
|
||||
secondary_only_state,
|
||||
@@ -532,6 +543,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl{
|
||||
if (!bound_opt) {
|
||||
_last_key_from_primary = {_cache._partitions.begin()->key(), true};
|
||||
_last_key_from_primary_populate_phase = _cache._populate_phaser.phase();
|
||||
_last_key_from_primary_continuity_flags_cleared = _cache._tracker.continuity_flags_cleared();
|
||||
return _cache._partitions.begin()->continuous();
|
||||
}
|
||||
const range_bound<dht::ring_position>& bound = bound_opt.value();
|
||||
@@ -547,6 +559,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl{
|
||||
(!bound.is_inclusive() || bound.value().relation_to_keys() == -1)) {
|
||||
_last_key_from_primary = {i->key(), true};
|
||||
_last_key_from_primary_populate_phase = _cache._populate_phaser.phase();
|
||||
_last_key_from_primary_continuity_flags_cleared = _cache._tracker.continuity_flags_cleared();
|
||||
return i->continuous();
|
||||
}
|
||||
--i;
|
||||
@@ -578,6 +591,9 @@ class scanning_and_populating_reader final : public mutation_reader::impl{
|
||||
// We have to capture mutation from data before we change the state because data lives in state
|
||||
// and changing state destroys previous state.
|
||||
streamed_mutation_opt result = std::move(data.mut);
|
||||
if (_cache._tracker.continuity_flags_cleared() != data.continuity_flags_cleared) {
|
||||
data.continuous = _cache.has_continuous_entry(*_last_key_from_primary.value);
|
||||
}
|
||||
if (data.continuous) {
|
||||
_state = after_continuous_entry_state{};
|
||||
} else {
|
||||
@@ -646,6 +662,11 @@ class scanning_and_populating_reader final : public mutation_reader::impl{
|
||||
});
|
||||
}
|
||||
future<streamed_mutation_opt> operator()(after_continuous_entry_state& state) {
|
||||
if (_last_key_from_primary_continuity_flags_cleared != _cache._tracker.continuity_flags_cleared()
|
||||
&& !_cache.has_continuous_entry(*_last_key_from_primary.value)) {
|
||||
_state = after_not_continuous_entry_state{};
|
||||
return operator()();
|
||||
}
|
||||
return _primary().then([this] (just_cache_scanning_reader::cache_data&& data) {
|
||||
if (!data.mut) {
|
||||
switch_to_end();
|
||||
@@ -774,7 +795,7 @@ void row_cache::clear_now() noexcept {
|
||||
deleter(p);
|
||||
});
|
||||
}
|
||||
_partitions.begin()->set_continuous(false);
|
||||
_tracker.clear_continuity(*_partitions.begin());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -862,7 +883,7 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
|
||||
_partitions.insert(cache_i, *entry);
|
||||
} else {
|
||||
--cache_i;
|
||||
cache_i->set_continuous(false);
|
||||
_tracker.clear_continuity(*cache_i);
|
||||
}
|
||||
i = m.partitions.erase(i);
|
||||
current_allocator().destroy(&mem_e);
|
||||
@@ -898,10 +919,10 @@ void row_cache::touch(const dht::decorated_key& dk) {
|
||||
void row_cache::invalidate_locked(const dht::decorated_key& dk) {
|
||||
auto pos = _partitions.lower_bound(dk, cache_entry::compare(_schema));
|
||||
if (pos == _partitions.end()) {
|
||||
_partitions.rbegin()->set_continuous(false);
|
||||
_tracker.clear_continuity(*_partitions.rbegin());
|
||||
} else if (!pos->key().equal(*_schema, dk)) {
|
||||
--pos;
|
||||
pos->set_continuous(false);
|
||||
_tracker.clear_continuity(*pos);
|
||||
} else {
|
||||
auto end = pos;
|
||||
++end;
|
||||
@@ -912,7 +933,7 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
|
||||
});
|
||||
assert (it != _partitions.begin());
|
||||
--it;
|
||||
it->set_continuous(false);
|
||||
_tracker.clear_continuity(*it);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -973,7 +994,21 @@ void row_cache::invalidate_unwrapped(const query::partition_range& range) {
|
||||
});
|
||||
assert(it != _partitions.begin());
|
||||
--it;
|
||||
it->set_continuous(false);
|
||||
_tracker.clear_continuity(*it);
|
||||
});
|
||||
}
|
||||
|
||||
bool row_cache::has_continuous_entry(const dht::ring_position& key) const {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
auto i = _partitions.lower_bound(key, cache_entry::compare(_schema));
|
||||
if (i == _partitions.end()) {
|
||||
return _partitions.rbegin()->continuous();
|
||||
}
|
||||
if (!i->key().equal(*_schema, key)) {
|
||||
--i;
|
||||
return i->continuous();
|
||||
}
|
||||
return i->continuous();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -156,6 +156,7 @@ private:
|
||||
uint64_t _removals = 0;
|
||||
uint64_t _partitions = 0;
|
||||
uint64_t _modification_count = 0;
|
||||
uint64_t _continuity_flags_cleared = 0;
|
||||
std::unique_ptr<scollectd::registrations> _collectd_registrations;
|
||||
logalloc::region _region;
|
||||
lru_type _lru;
|
||||
@@ -167,17 +168,20 @@ public:
|
||||
void clear();
|
||||
void touch(cache_entry&);
|
||||
void insert(cache_entry&);
|
||||
void clear_continuity(cache_entry& ce);
|
||||
void on_erase();
|
||||
void on_merge();
|
||||
void on_hit();
|
||||
void on_miss();
|
||||
void on_uncached_wide_partition();
|
||||
void on_continuity_flag_cleared();
|
||||
allocation_strategy& allocator();
|
||||
logalloc::region& region();
|
||||
const logalloc::region& region() const;
|
||||
uint64_t modification_count() const { return _modification_count; }
|
||||
uint64_t partitions() const { return _partitions; }
|
||||
uint64_t uncached_wide_partitions() const { return _uncached_wide_partitions; }
|
||||
uint64_t continuity_flags_cleared() const { return _continuity_flags_cleared; }
|
||||
};
|
||||
|
||||
// Returns a reference to shard-wide cache_tracker.
|
||||
@@ -294,6 +298,8 @@ public:
|
||||
// The range must be kept alive until method resolves.
|
||||
future<> invalidate(const query::partition_range&);
|
||||
|
||||
bool has_continuous_entry(const dht::ring_position& key) const;
|
||||
|
||||
auto num_entries() const {
|
||||
return _partitions.size();
|
||||
}
|
||||
|
||||
@@ -896,6 +896,60 @@ static key_source make_key_source(schema_ptr s, std::vector<lw_shared_ptr<memtab
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_continuity_flag_and_invalidate_race) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
|
||||
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, mt->as_data_source(), mt->as_key_source(), tracker);
|
||||
|
||||
auto ring = make_ring(s, 4);
|
||||
for (auto&& m : ring) {
|
||||
mt->apply(m);
|
||||
}
|
||||
|
||||
// Bring ring[2]and ring[3] to cache.
|
||||
assert_that(cache.make_reader(s, query::partition_range::make_starting_with({ ring[2].ring_position(), true })))
|
||||
.produces(ring[2])
|
||||
.produces(ring[3])
|
||||
.produces_end_of_stream();
|
||||
|
||||
// Start reader with full range.
|
||||
auto rd = assert_that(cache.make_reader(s, query::full_partition_range));
|
||||
rd.produces(ring[0]);
|
||||
|
||||
// Invalidate ring[2] and ring[3]
|
||||
cache.invalidate(query::partition_range::make_starting_with({ ring[2].ring_position(), true })).get();
|
||||
|
||||
// Continue previous reader.
|
||||
rd.produces(ring[1])
|
||||
.produces(ring[2])
|
||||
.produces(ring[3])
|
||||
.produces_end_of_stream();
|
||||
|
||||
// Start another reader with full range.
|
||||
rd = assert_that(cache.make_reader(s, query::full_partition_range));
|
||||
rd.produces(ring[0])
|
||||
.produces(ring[1])
|
||||
.produces(ring[2]);
|
||||
|
||||
// Invalidate whole cache.
|
||||
cache.clear().get();
|
||||
|
||||
rd.produces(ring[3])
|
||||
.produces_end_of_stream();
|
||||
|
||||
// Start yet another reader with full range.
|
||||
assert_that(cache.make_reader(s, query::full_partition_range))
|
||||
.produces(ring[0])
|
||||
.produces(ring[1])
|
||||
.produces(ring[2])
|
||||
.produces(ring[3])
|
||||
.produces_end_of_stream();;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
|
||||
Reference in New Issue
Block a user