Merge "Do not evict from memtable snapshots" from Tomasz

"When moving whole partition entries from memtable to cache, we move
snapshots as well. It is incorrect to evict from such snapshots
though, because associated readers would miss data.

Solution is to record evictability of partition version references (snapshots)
and avoiding eviction from non-evictable snapshots.

Could affect scanning reads, if the reader uses partition entry from
memtable, and the partition is too large to fit in reader's buffer,
and that entry gets moved to cache (was absent in cache), and then
gets evicted (memory pressure). The reader will not see the remainder
of that entry. Found during code review.

Introduced in ca8e3c4, so affects 2.1+

Fixes #3186.

Tests: unit (release)"

* 'tgrabiec/do-not-evict-memtable-snapshots' of github.com:tgrabiec/scylla:
  tests: mvcc: Add test for eviction with non-evictable snapshots
  mutation_partition: Define + operator on tombstones
  tests: mvcc: Check that partition is fully discontinuous after eviction
  tests: row_cache: Add test for memtable readers surviving flush and eviction
  memtable: Make printable
  mvcc: Take partition_entry by const ref in operator<<()
  mvcc: Do not evict from non-evictable snapshots
  mvcc: Drop unnecessary assignment to partition_snapshot::_version
  tests: Use partition_entry::make_evictable() where appropriate
  mvcc: Encapsulate construction of evictable entries
This commit is contained in:
Paweł Dziepak
2018-02-06 14:46:24 +00:00
8 changed files with 209 additions and 50 deletions

View File

@@ -682,3 +682,12 @@ void memtable::upgrade_entry(memtable_entry& e) {
void memtable::set_schema(schema_ptr new_schema) noexcept {
_schema = std::move(new_schema);
}
std::ostream& operator<<(std::ostream& out, memtable& mt) {
logalloc::reclaim_lock rl(mt);
return out << "{memtable: [" << ::join(",\n", mt.partitions) << "]}";
}
std::ostream& operator<<(std::ostream& out, const memtable_entry& mt) {
return out << "{" << mt.key() << ": " << mt.partition() << "}";
}

View File

@@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include <iosfwd>
#include "database_fwd.hh"
#include "dht/i_partitioner.hh"
#include "schema.hh"
@@ -105,6 +106,8 @@ public:
return _c(k1, k2._key);
}
};
friend std::ostream& operator<<(std::ostream&, const memtable_entry&);
};
class dirty_memory_manager;
@@ -229,4 +232,6 @@ public:
dirty_memory_manager& get_dirty_memory_manager() {
return _dirty_mgr;
}
friend std::ostream& operator<<(std::ostream&, memtable&);
};

View File

@@ -170,14 +170,8 @@ void partition_snapshot::merge_partition_versions() {
auto current = first_used->next();
while (current && !current->is_referenced()) {
auto next = current->next();
try {
merge_versions(*_schema, first_used->partition(), std::move(current->partition()));
current_allocator().destroy(current);
} catch (...) {
// Set _version so that the merge can be retried.
_version = partition_version_ref(*current);
throw;
}
merge_versions(*_schema, first_used->partition(), std::move(current->partition()));
current_allocator().destroy(current);
current = next;
}
}
@@ -196,7 +190,41 @@ unsigned partition_snapshot::version_count()
partition_entry::partition_entry(mutation_partition mp)
{
auto new_version = current_allocator().construct<partition_version>(std::move(mp));
_version = partition_version_ref(*new_version);
_version = partition_version_ref(*new_version, partition_version::is_evictable::no);
}
partition_entry::partition_entry(partition_entry::evictable_tag, const schema& s, mutation_partition&& mp)
: partition_entry(std::move(mp))
{
_version->partition().ensure_last_dummy(s);
_version.make_evictable();
}
partition_entry::partition_entry(partition_entry::evictable_tag, const schema& s, partition_entry&& e)
: partition_entry(std::move(e))
{
if (_snapshot) {
// We must not change evictability of existing snapshots
// FIXME: https://github.com/scylladb/scylla/issues/1938
add_version(s);
}
_version.make_evictable();
}
partition_entry partition_entry::make_evictable(const schema& s, mutation_partition&& mp) {
return {evictable_tag(), s, std::move(mp)};
}
partition_entry partition_entry::make_evictable(const schema& s, const mutation_partition& mp) {
return make_evictable(s, mutation_partition(mp));
}
partition_entry partition_entry::make_evictable(const schema& s, partition_entry&& pe) {
// If we can assume that _pe is fully continuous, we don't need to check all versions
// to determine what the continuity is.
// This doesn't change value and doesn't invalidate iterators, so can be called even with a snapshot.
pe.version()->partition().ensure_last_dummy(s);
return partition_entry(evictable_tag(), s, std::move(pe));
}
partition_entry::~partition_entry() {
@@ -216,13 +244,15 @@ partition_entry::~partition_entry() {
void partition_entry::set_version(partition_version* new_version)
{
bool evictable = _version.evictable();
if (_snapshot) {
_snapshot->_version = std::move(_version);
_snapshot->_entry = nullptr;
}
_snapshot = nullptr;
_version = partition_version_ref(*new_version);
_version = partition_version_ref(*new_version, partition_version::is_evictable(evictable));
}
partition_version& partition_entry::add_version(const schema& s) {
@@ -368,7 +398,7 @@ void partition_entry::with_detached_versions(Func&& func) {
snapshot->_entry = nullptr;
_snapshot = nullptr;
}
_version = { };
auto prev = std::exchange(_version, {});
auto revert = defer([&] {
if (snapshot) {
@@ -376,7 +406,7 @@ void partition_entry::with_detached_versions(Func&& func) {
snapshot->_entry = this;
_version = std::move(snapshot->_version);
} else {
_version = partition_version_ref(*current);
_version = std::move(prev);
}
});
@@ -510,18 +540,20 @@ partition_snapshot::range_tombstones()
position_in_partition_view::after_all_clustered_rows());
}
std::ostream& operator<<(std::ostream& out, partition_entry& e) {
std::ostream& operator<<(std::ostream& out, const partition_entry& e) {
out << "{";
bool first = true;
if (e._version) {
for (const partition_version& v : e.versions()) {
const partition_version* v = &*e._version;
while (v) {
if (!first) {
out << ", ";
}
if (v.is_referenced()) {
if (v->is_referenced()) {
out << "(*) ";
}
out << v.partition();
out << v->partition();
v = v->next();
first = false;
}
}
@@ -534,6 +566,9 @@ void partition_entry::evict() noexcept {
return;
}
for (auto&& v : versions()) {
if (v.is_referenced() && !v.back_reference().evictable()) {
break;
}
v.partition().evict();
}
current_allocator().invalidate_references();

View File

@@ -117,6 +117,8 @@ class partition_version : public anchorless_list_base_hook<partition_version> {
friend class partition_version_ref;
public:
using is_evictable = bool_class<class evictable_tag>;
explicit partition_version(schema_ptr s) noexcept
: _partition(std::move(s)) { }
explicit partition_version(mutation_partition mp) noexcept
@@ -139,11 +141,15 @@ using partition_version_range = anchorless_list_base_hook<partition_version>::ra
class partition_version_ref {
partition_version* _version = nullptr;
bool _unique_owner = false;
bool _evictable;
friend class partition_version;
public:
partition_version_ref() = default;
explicit partition_version_ref(partition_version& pv) noexcept : _version(&pv) {
explicit partition_version_ref(partition_version& pv, partition_version::is_evictable ev) noexcept
: _version(&pv)
, _evictable(ev)
{
assert(!_version->_backref);
_version->_backref = this;
}
@@ -152,7 +158,10 @@ public:
_version->_backref = nullptr;
}
}
partition_version_ref(partition_version_ref&& other) noexcept : _version(other._version) {
partition_version_ref(partition_version_ref&& other) noexcept
: _version(other._version)
, _evictable(other._evictable)
{
if (_version) {
_version->_backref = this;
}
@@ -187,6 +196,8 @@ public:
bool is_unique_owner() const { return _unique_owner; }
void mark_as_unique_owner() { _unique_owner = true; }
void make_evictable() { _evictable = true; }
bool evictable() const { return _evictable; }
};
class partition_entry;
@@ -287,6 +298,11 @@ public:
// objects called versions. The logical mutation_partition state represented
// by that chain is equal to reducing the chain using mutation_partition::apply()
// from left (latest version) to right.
//
// We distinguish evictable and non-evictable partition entries. Entries which
// are non-evictable have all their elements non-evictable and fully continuous.
// Partition snapshots inherit evictability of the entry, which remains invariant
// for a snapshot.
class partition_entry {
partition_snapshot* _snapshot = nullptr;
partition_version_ref _version;
@@ -303,11 +319,22 @@ private:
void apply_to_incomplete(const schema& s, partition_version* other, logalloc::region&);
public:
struct evictable_tag {};
class rows_iterator;
// Constructs a non-evictable entry holding empty partition
partition_entry() = default;
// Constructs a non-evictable entry
explicit partition_entry(mutation_partition mp);
// Constructs an evictable entry
partition_entry(evictable_tag, const schema& s, mutation_partition&& mp);
partition_entry(evictable_tag, const schema& s, partition_entry&&);
~partition_entry();
static partition_entry make_evictable(const schema& s, mutation_partition&& mp);
static partition_entry make_evictable(const schema& s, const mutation_partition& mp);
// pe must be a non-evictable fully continuous entry.
static partition_entry make_evictable(const schema& s, partition_entry&& pe);
partition_entry(partition_entry&& pe) noexcept
: _snapshot(pe._snapshot), _version(std::move(pe._version))
{
@@ -338,15 +365,14 @@ public:
// Strong exception guarantees.
// Assumes this instance and mp are fully continuous.
// Use only on non-evictable entries.
void apply(const schema& s, const mutation_partition& mp, const schema& mp_schema);
void apply(const schema& s, mutation_partition&& mp, const schema& mp_schema);
// Strong exception guarantees.
// Assumes this instance and mpv are fully continuous.
void apply(const schema& s, mutation_partition_view mpv, const schema& mp_schema);
// Adds mutation_partition represented by "other" to the one represented
// by this entry.
// This entry must be evictable.
//
// The argument must be fully-continuous.
//
@@ -388,7 +414,7 @@ public:
lw_shared_ptr<partition_snapshot> read(logalloc::region& region, schema_ptr entry_schema,
partition_snapshot::phase_type phase = partition_snapshot::default_phase);
friend std::ostream& operator<<(std::ostream& out, partition_entry& e);
friend std::ostream& operator<<(std::ostream& out, const partition_entry& e);
};
// Monotonic exception guarantees

View File

@@ -101,30 +101,21 @@ public:
cache_entry(schema_ptr s, const dht::decorated_key& key, const mutation_partition& p)
: _schema(std::move(s))
, _key(key)
, _pe(p)
{
_pe.version()->partition().ensure_last_dummy(*_schema);
}
, _pe(partition_entry::make_evictable(*_schema, mutation_partition(p)))
{ }
cache_entry(schema_ptr s, dht::decorated_key&& key, mutation_partition&& p) noexcept
: _schema(std::move(s))
, _key(std::move(key))
, _pe(std::move(p))
{
_pe.version()->partition().ensure_last_dummy(*_schema);
}
, _pe(partition_entry::make_evictable(*_schema, std::move(p)))
{ }
// It is assumed that pe is fully continuous
cache_entry(schema_ptr s, dht::decorated_key&& key, partition_entry&& pe) noexcept
: _schema(std::move(s))
, _key(std::move(key))
, _pe(std::move(pe))
{
// If we can assume that _pe is fully continuous, we don't need to check all versions
// to determine what the continuity is.
// This doesn't change value and doesn't invalidate iterators, so can be called even with a snapshot.
_pe.version()->partition().ensure_last_dummy(*_schema);
}
, _pe(partition_entry::make_evictable(*_schema, std::move(pe)))
{ }
cache_entry(cache_entry&&) noexcept;
~cache_entry();

View File

@@ -284,7 +284,7 @@ SEASTAR_TEST_CASE(test_full_eviction_marks_affected_range_as_discontinuous) {
auto ck1 = table.make_ckey(1);
auto ck2 = table.make_ckey(2);
auto e = partition_entry(mutation_partition(table.schema()));
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
auto t = table.new_tombstone();
auto&& p1 = e.open_version(s).partition();
@@ -300,15 +300,8 @@ SEASTAR_TEST_CASE(test_full_eviction_marks_affected_range_as_discontinuous) {
e.evict();
BOOST_REQUIRE(snap1->squashed().fully_discontinuous(s, position_range(
position_in_partition::before_all_clustered_rows(),
position_in_partition::after_key(ck2)
)));
BOOST_REQUIRE(snap2->squashed().fully_discontinuous(s, position_range(
position_in_partition::before_all_clustered_rows(),
position_in_partition::after_key(ck2)
)));
BOOST_REQUIRE(snap1->squashed().fully_discontinuous(s, position_range::all_clustered_rows()));
BOOST_REQUIRE(snap2->squashed().fully_discontinuous(s, position_range::all_clustered_rows()));
BOOST_REQUIRE(!snap1->squashed().static_row_continuous());
BOOST_REQUIRE(!snap2->squashed().static_row_continuous());
@@ -328,7 +321,7 @@ SEASTAR_TEST_CASE(test_eviction_with_active_reader) {
auto ck1 = table.make_ckey(1);
auto ck2 = table.make_ckey(2);
auto e = partition_entry(mutation_partition(table.schema()));
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
auto&& p1 = e.open_version(s).partition();
p1.clustered_row(s, ck2);
@@ -386,7 +379,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
// Without active reader
auto test = [&] (bool with_active_reader) {
logalloc::reclaim_lock rl(r);
auto e = partition_entry(m3.partition());
auto e = partition_entry::make_evictable(*s, m3.partition());
partition_version& v2 = e.add_version(*s);
v2.partition() = m2.partition();
partition_version& v3 = e.add_version(*s);
@@ -466,7 +459,7 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) {
simple_schema table;
auto&& s = *table.schema();
auto e = partition_entry(mutation_partition(table.schema()));
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
auto snap1 = e.read(r, table.schema());
{
@@ -669,3 +662,48 @@ SEASTAR_TEST_CASE(test_apply_is_atomic) {
do_test(random_mutation_generator(random_mutation_generator::generate_counters::yes));
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_eviction_with_mixed_snapshot_evictability) {
return seastar::async([] {
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
auto s = gen.schema();
mutation m1 = gen();
mutation m2 = gen();
m1.partition().make_fully_continuous();
m2.partition().make_fully_continuous();
logalloc::region r;
with_allocator(r.allocator(), [&] {
logalloc::reclaim_lock l(r);
auto e = partition_entry(mutation_partition(s));
e.apply(*s, m1.partition(), *s);
auto snap1 = e.read(r, s); // non-evictable
e = partition_entry::make_evictable(*s, std::move(e));
{
partition_entry tmp(m2.partition());
e.apply_to_incomplete(*s, std::move(tmp), *m2.schema(), r);
}
auto snap2 = e.read(r, s); // evictable
e.evict();
assert_that(s, read_using_cursor(*snap1)).is_equal_to(m1.partition());
snap1 = {};
e.evict();
// Everything should be evicted now, since non-evictable snapshot is gone
auto pt = m1.partition().partition_tombstone() + m2.partition().partition_tombstone();
assert_that(s, read_using_cursor(*snap2))
.is_equal_to(mutation_partition::make_incomplete(*s, pt));
});
});
}

View File

@@ -1804,6 +1804,12 @@ static void apply(row_cache& cache, memtable_snapshot_source& underlying, const
cache.update([&] { underlying.apply(m); }, *mt).get();
}
static void apply(row_cache& cache, memtable_snapshot_source& underlying, memtable& m) {
auto mt1 = make_lw_shared<memtable>(m.schema());
mt1->apply(m).get();
cache.update([&] { underlying.apply(std::move(mt1)); }, m).get();
}
SEASTAR_TEST_CASE(test_readers_get_all_data_after_eviction) {
return seastar::async([] {
simple_schema table;
@@ -2884,3 +2890,46 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
.produces(versions.back());
});
}
SEASTAR_TEST_CASE(test_cache_update_and_eviction_preserves_monotonicity_of_memtable_readers) {
// Verifies that memtable readers created before memtable is moved to cache
// are not affected by eviction in cache after their partition entries were moved to cache.
// Reproduces https://github.com/scylladb/scylla/issues/3186
return seastar::async([] {
simple_schema ss;
schema_ptr s = ss.schema();
auto m1 = ss.new_mutation("pk1");
const auto n_rows = 10000;
for (auto i = 0u; i < n_rows; ++i) {
ss.add_row(m1, ss.make_ckey(i), "val");
}
cache_tracker tracker;
memtable_snapshot_source underlying(s);
row_cache cache(s, snapshot_source([&] { return underlying(); }), tracker, is_continuous::yes);
lw_shared_ptr<memtable> mt = make_lw_shared<memtable>(s);
mt->apply(m1);
auto mt_rd1 = mt->make_flat_reader(s);
mt_rd1.set_max_buffer_size(1);
mt_rd1.fill_buffer().get();
BOOST_REQUIRE(mt_rd1.is_buffer_full()); // If fails, increase n_rows
auto mt_rd2 = mt->make_flat_reader(s);
mt_rd2.set_max_buffer_size(1);
mt_rd2.fill_buffer().get();
apply(cache, underlying, *mt);
assert_that(std::move(mt_rd1))
.produces(m1);
cache.evict();
assert_that(std::move(mt_rd2))
.produces(m1);
});
}

View File

@@ -80,6 +80,12 @@ struct tombstone final : public with_relational_operators<tombstone> {
std::swap(*this, t);
}
tombstone operator+(const tombstone& t) {
auto result = *this;
result.apply(t);
return result;
}
friend std::ostream& operator<<(std::ostream& out, const tombstone& t) {
if (t) {
return out << "{tombstone: timestamp=" << t.timestamp << ", deletion_time=" << t.deletion_time.time_since_epoch().count() << "}";