Merge "Evict from partition snapshots in cache" from Tomasz

"This series fixes the problem of active reads causing OOM due to the fact that
partition snapshots they hold are not evictable. In particular, a single scan
of a partition larger than memory will bad_alloc due to itself.

After this, when partition entry is evicted from cache, data in all the snapshots
is also evicted. We still don't have row-level eviction, but this series lays some
grounds for it by making cache readers prepared for the possibility of rows
being evicted.

Fixes #2775.
Fixes #2730."

* tag 'tgrabiec/snapshot-evicition-in-cache-v1' of github.com:scylladb/seastar-dev:
  tests: Add test for partition_entry::evict()
  mutation_partition: Introduce range continuity checking methods
  mutation_partition: Enable rows_entry::compare() on position_in_partition_views
  tests: Extract mvcc tests to separate file
  tests: row_cache: Add evicition tests
  tests: simple_schema: Add new_tombstone() helper
  tests: streamed_mutation_assertions: Introduce produces(mutation&)
  streamed_mutation: Allow setting buffer capacity
  row_cache: Evict partition snapshots
  mvcc: Introduce partition_entry::evict()
  row_cache: Handle eviction in partition reader
  tests: row_cache_test: Don't assume mvcc snapshots are not evictable
  row_cache: Reuse allocation_strategy::invalidate_references()
  row_cache: Don't invalidate references on insertion
  lsa: Move reclaim counter concept to allocation_strategy level
  mvcc: Ensure partition_snapshot always destroys versions using proper allocator
  mvcc: Encapsulate reference stability check in partition_snapshot
  mvcc: Store LSA region reference in partition_snapshot
This commit is contained in:
Avi Kivity
2017-09-13 20:48:33 +03:00
23 changed files with 624 additions and 193 deletions

View File

@@ -141,7 +141,7 @@ public:
, _lower_bound(position_in_partition::before_all_clustered_rows())
, _upper_bound(position_in_partition_view::before_all_clustered_rows())
, _read_context(std::move(ctx))
, _next_row(*_schema, cache._tracker.region(), *_snp)
, _next_row(*_schema, *_snp)
{ }
cache_streamed_mutation(const cache_streamed_mutation&) = delete;
cache_streamed_mutation(cache_streamed_mutation&&) = delete;
@@ -205,10 +205,15 @@ future<> cache_streamed_mutation::do_fill_buffer() {
return read_from_underlying();
}
return _lsa_manager.run_in_read_section([this] {
auto same_pos = _next_row.maybe_refresh();
// FIXME: If continuity changed anywhere between _lower_bound and _next_row.position()
// we need to redo the lookup with _lower_bound. There is no eviction yet, so not yet a problem.
assert(same_pos);
// We assume that if there was eviction, and thus the range may
// no longer be continuous, the cursor was invalidated.
if (!_next_row.up_to_date()) {
auto adjacent = _next_row.advance_to(_lower_bound);
_next_row_in_range = !after_current_range(_next_row.position());
if (!adjacent && !_next_row.continuous()) {
return start_reading_from_underlying();
}
}
while (!is_buffer_full() && !_end_of_stream && !_reading_underlying) {
future<> f = copy_from_cache_to_buffer();
if (!f.available() || need_preempt()) {
@@ -232,7 +237,14 @@ future<> cache_streamed_mutation::read_from_underlying() {
_reading_underlying = false;
return _lsa_manager.run_in_update_section([this] {
auto same_pos = _next_row.maybe_refresh();
assert(same_pos); // FIXME: handle eviction
if (!same_pos) {
_read_context->cache().on_mispopulate(); // FIXME: Insert dummy entry at _upper_bound.
_next_row_in_range = !after_current_range(_next_row.position());
if (!_next_row.continuous()) {
return start_reading_from_underlying();
}
return make_ready_future<>();
}
if (_next_row_in_range) {
maybe_update_continuity();
add_to_buffer(_next_row);

View File

@@ -167,6 +167,7 @@ modes = {
scylla_tests = [
'tests/mutation_test',
'tests/mvcc_test',
'tests/streamed_mutation_test',
'tests/schema_registry_test',
'tests/canonical_mutation_test',

View File

@@ -403,7 +403,7 @@ public:
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
} else {
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), query::full_slice, e->key().key());
auto snp = e->partition().read(schema());
auto snp = e->partition().read(region(), schema());
auto mpsr = make_partition_snapshot_reader<partition_snapshot_accounter>(schema(), e->key(), std::move(cr),
snp, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, _flushed_memory);
_flushed_memory.account_component(*e);
@@ -547,7 +547,7 @@ memtable_entry::read(lw_shared_ptr<memtable> mtbl,
mutation m = mutation(target_schema, _key, std::move(mp));
return streamed_mutation_from_mutation(std::move(m), fwd);
}
auto snp = _pe.read(_schema);
auto snp = _pe.read(mtbl->region(), _schema);
return make_partition_snapshot_reader(_schema, _key, std::move(cr), snp, *mtbl, mtbl->_read_section, mtbl, fwd);
}

View File

@@ -158,6 +158,10 @@ public:
return *this;
}
logalloc::region& region() {
return *this;
}
logalloc::region_group* region_group() {
return group();
}

View File

@@ -2128,6 +2128,57 @@ void mutation_partition::make_fully_continuous() {
}
}
void mutation_partition::evict() noexcept {
if (!_rows.empty()) {
// We need to keep the last entry to mark the range containing all evicted rows as discontinuous.
// No rows would mean it is continuous.
auto i = _rows.erase_and_dispose(_rows.begin(), std::prev(_rows.end()), current_deleter<rows_entry>());
rows_entry& e = *i;
e._flags._last = true;
e._flags._dummy = true;
e._flags._continuous = false;
e._row = {};
}
_row_tombstones.clear();
_static_row_continuous = false;
_static_row = {};
}
bool
mutation_partition::check_continuity(const schema& s, const position_range& r, is_continuous cont) {
auto less = rows_entry::compare(s);
auto i = _rows.lower_bound(r.start(), less);
auto end = _rows.lower_bound(r.end(), less);
if (!less(r.start(), r.end())) {
return bool(cont);
}
if (i != end) {
if (no_clustering_row_between(s, r.start(), i->position())) {
++i;
}
while (i != end) {
if (i->continuous() != cont) {
return false;
}
++i;
}
if (end != _rows.begin() && no_clustering_row_between(s, std::prev(end)->position(), r.end())) {
return true;
}
}
return (end == _rows.end() ? is_continuous::yes : end->continuous()) == cont;
}
bool
mutation_partition::fully_continuous(const schema& s, const position_range& r) {
return check_continuity(s, r, is_continuous::yes);
}
bool
mutation_partition::fully_discontinuous(const schema& s, const position_range& r) {
return check_continuity(s, r, is_continuous::no);
}
future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& source,
const dht::decorated_key& dk,
const query::partition_slice& slice,

View File

@@ -788,6 +788,9 @@ public:
bool operator()(position_in_partition_view p, const rows_entry& e) const {
return _c(p, e.position()) < 0;
}
bool operator()(position_in_partition_view p1, position_in_partition_view p2) const {
return _c(p1, p2) < 0;
}
};
template <typename Comparator>
struct delegating_compare {
@@ -874,6 +877,8 @@ private:
friend class mutation_partition_applier;
friend class converting_mutation_partition_applier;
bool check_continuity(const schema&, const position_range&, is_continuous);
public:
struct copy_comparators_only {};
struct incomplete_tag {};
@@ -915,6 +920,12 @@ public:
void set_static_row_continuous(bool value) { _static_row_continuous = value; }
bool is_fully_continuous() const;
void make_fully_continuous();
// Returns true iff all keys from given range are marked as continuous, or range is empty.
bool fully_continuous(const schema&, const position_range&);
// Returns true iff all keys from given range are marked as not continuous and range is not empty.
bool fully_discontinuous(const schema&, const position_range&);
// Removes all data, marking affected ranges as discontinuous.
void evict() noexcept;
void apply(tombstone t) { _tombstone.apply(t); }
void apply_delete(const schema& schema, const clustering_key_prefix& prefix, tombstone t);
void apply_delete(const schema& schema, range_tombstone rt);

View File

@@ -101,8 +101,7 @@ private:
return *this;
}
uint64_t _reclaim_counter;
unsigned _version_count = 0;
partition_snapshot::change_mark _change_mark;
private:
void refresh_iterators() {
_clustering_rows.clear();
@@ -207,10 +206,10 @@ private:
}
}
if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) {
auto mark = _snapshot->get_change_mark();
if (!_in_ck_range || mark != _change_mark) {
refresh_iterators();
_reclaim_counter = _lsa_region.reclaim_counter();
_version_count = _snapshot->version_count();
_change_mark = mark;
}
while (!is_end_of_stream() && !is_buffer_full()) {

View File

@@ -52,13 +52,11 @@ class partition_snapshot_row_cursor final {
};
const schema& _schema;
logalloc::region& _region;
partition_snapshot& _snp;
std::vector<position_in_version> _heap;
std::vector<position_in_version> _current_row;
position_in_partition _position;
uint64_t _last_reclaim_count = 0;
size_t _last_versions_count = 0;
partition_snapshot::change_mark _change_mark;
// Removes the next row from _heap and puts it into _current_row
void recreate_current_row() {
@@ -72,9 +70,8 @@ class partition_snapshot_row_cursor final {
_position = position_in_partition(_current_row[0].it->position());
}
public:
partition_snapshot_row_cursor(const schema& s, logalloc::region& region, partition_snapshot& snp)
partition_snapshot_row_cursor(const schema& s, partition_snapshot& snp)
: _schema(s)
, _region(region)
, _snp(snp)
, _position(position_in_partition::static_row_tag_t{})
{ }
@@ -85,7 +82,7 @@ public:
return _current_row[0].it;
}
bool up_to_date() const {
return _region.reclaim_counter() == _last_reclaim_count && _last_versions_count == _snp.version_count();
return _snp.get_change_mark() == _change_mark;
}
// Brings back the cursor to validity.
@@ -130,8 +127,7 @@ public:
++version_no;
}
boost::range::make_heap(_heap, heap_less);
_last_reclaim_count = _region.reclaim_counter();
_last_versions_count = _snp.version_count();
_change_mark = _snp.get_change_mark();
bool found = no_clustering_row_between(_schema, lower_bound, _heap[0].it->position());
recreate_current_row();
return found;

View File

@@ -130,13 +130,15 @@ tombstone partition_entry::partition_tombstone() const {
}
partition_snapshot::~partition_snapshot() {
if (_version && _version.is_unique_owner()) {
auto v = &*_version;
_version = {};
remove_or_mark_as_unique_owner(v);
} else if (_entry) {
_entry->_snapshot = nullptr;
}
with_allocator(_region.allocator(), [this] {
if (_version && _version.is_unique_owner()) {
auto v = &*_version;
_version = {};
remove_or_mark_as_unique_owner(v);
} else if (_entry) {
_entry->_snapshot = nullptr;
}
});
}
void partition_snapshot::merge_partition_versions() {
@@ -530,13 +532,14 @@ void partition_entry::upgrade(schema_ptr from, schema_ptr to)
remove_or_mark_as_unique_owner(old_version);
}
lw_shared_ptr<partition_snapshot> partition_entry::read(schema_ptr entry_schema, partition_snapshot::phase_type phase)
lw_shared_ptr<partition_snapshot> partition_entry::read(logalloc::region& r,
schema_ptr entry_schema, partition_snapshot::phase_type phase)
{
open_version(*entry_schema, phase);
if (_snapshot) {
return _snapshot->shared_from_this();
} else {
auto snp = make_lw_shared<partition_snapshot>(entry_schema, this, phase);
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, this, phase);
_snapshot = snp.get();
return snp;
}
@@ -572,3 +575,13 @@ std::ostream& operator<<(std::ostream& out, partition_entry& e) {
out << "}";
return out;
}
void partition_entry::evict() noexcept {
if (!_version) {
return;
}
for (auto&& v : versions()) {
v.partition().evict();
}
current_allocator().invalidate_references();
}

View File

@@ -191,19 +191,42 @@ public:
using phase_type = uint64_t;
static constexpr phase_type default_phase = 0;
static constexpr phase_type max_phase = std::numeric_limits<phase_type>::max();
public:
// Used for determining reference stability.
// References and iterators into versions owned by the snapshot
// obtained between two equal change_mark objects were produced
// by that snapshot are guaranteed to be still valid.
class change_mark {
uint64_t _reclaim_count = 0;
size_t _versions_count = 0; // merge_partition_versions() removes versions on merge
private:
friend class partition_snapshot;
change_mark(uint64_t reclaim_count, size_t versions_count)
: _reclaim_count(reclaim_count), _versions_count(versions_count) {}
public:
change_mark() = default;
bool operator==(const change_mark& m) const {
return _reclaim_count == m._reclaim_count && _versions_count == m._versions_count;
}
bool operator!=(const change_mark& m) const {
return !(*this == m);
}
};
private:
schema_ptr _schema;
// Either _version or _entry is non-null.
partition_version_ref _version;
partition_entry* _entry;
phase_type _phase;
logalloc::region& _region;
friend class partition_entry;
public:
explicit partition_snapshot(schema_ptr s,
logalloc::region& region,
partition_entry* entry,
phase_type phase = default_phase)
: _schema(std::move(s)), _entry(entry), _phase(phase) { }
: _schema(std::move(s)), _entry(entry), _phase(phase), _region(region) { }
partition_snapshot(const partition_snapshot&) = delete;
partition_snapshot(partition_snapshot&&) = delete;
partition_snapshot& operator=(const partition_snapshot&) = delete;
@@ -218,6 +241,10 @@ public:
partition_version_ref& version();
change_mark get_change_mark() {
return {_region.reclaim_counter(), version_count()};
}
const partition_version_ref& version() const;
partition_version_range versions() {
@@ -280,6 +307,10 @@ public:
return *this;
}
// Removes all data marking affected ranges as discontinuous.
// Includes versions referenced by snapshots.
void evict() noexcept;
partition_version_ref& version() {
return _version;
}
@@ -338,7 +369,7 @@ public:
void upgrade(schema_ptr from, schema_ptr to);
// Snapshots with different values of phase will point to different partition_version objects.
lw_shared_ptr<partition_snapshot> read(schema_ptr entry_schema,
lw_shared_ptr<partition_snapshot> read(logalloc::region& region, schema_ptr entry_schema,
partition_snapshot::phase_type phase = partition_snapshot::default_phase);
friend std::ostream& operator<<(std::ostream& out, partition_entry& e);

View File

@@ -76,7 +76,6 @@ cache_tracker::cache_tracker() {
evict_last(_lru);
--_stats.partitions;
++_stats.partition_evictions;
++_stats.modification_count;
return memory::reclaiming_result::reclaimed_something;
} catch (std::bad_alloc&) {
// Bad luck, linearization during partition removal caused us to
@@ -139,7 +138,7 @@ void cache_tracker::clear() {
});
_stats.partition_removals += _stats.partitions;
_stats.partitions = 0;
++_stats.modification_count;
allocator().invalidate_references();
}
void cache_tracker::touch(cache_entry& e) {
@@ -153,14 +152,13 @@ void cache_tracker::touch(cache_entry& e) {
void cache_tracker::insert(cache_entry& entry) {
++_stats.partition_insertions;
++_stats.partitions;
++_stats.modification_count;
_lru.push_front(entry);
}
void cache_tracker::on_erase() {
--_stats.partitions;
++_stats.partition_removals;
++_stats.modification_count;
allocator().invalidate_references();
}
void cache_tracker::on_merge() {
@@ -219,7 +217,6 @@ class partition_range_cursor final {
dht::ring_position_view _end_pos;
stdx::optional<dht::decorated_key> _last;
uint64_t _last_reclaim_count;
size_t _last_modification_count;
private:
void set_position(cache_entry& e) {
// FIXME: make ring_position_view convertible to ring_position, so we can use e.position()
@@ -240,7 +237,6 @@ public:
, _start_pos(dht::ring_position_view::for_range_start(range))
, _end_pos(dht::ring_position_view::for_range_end(range))
, _last_reclaim_count(std::numeric_limits<uint64_t>::max())
, _last_modification_count(std::numeric_limits<size_t>::max())
{ }
// Ensures that cache entry reference is valid.
@@ -248,10 +244,8 @@ public:
// Returns true if and only if the position of the cursor changed.
// Strong exception guarantees.
bool refresh() {
auto reclaim_count = _cache.get().get_cache_tracker().region().reclaim_counter();
auto modification_count = _cache.get().get_cache_tracker().modification_count();
if (reclaim_count == _last_reclaim_count && modification_count == _last_modification_count) {
auto reclaim_count = _cache.get().get_cache_tracker().allocator().invalidate_counter();
if (reclaim_count == _last_reclaim_count) {
return true;
}
@@ -264,7 +258,6 @@ public:
auto same = !cmp(_start_pos, _it->position());
set_position(*_it);
_last_reclaim_count = reclaim_count;
_last_modification_count = modification_count;
return same;
}
@@ -861,7 +854,7 @@ future<> row_cache::update_invalidating(external_updater eu, memtable& m) {
// This invalidates all row ranges and the static row, leaving only the partition tombstone continuous,
// which has to always be continuous.
cache_entry& e = *cache_i;
e.partition() = partition_entry(mutation_partition::make_incomplete(*e.schema(), mem_e.partition().partition_tombstone()));
e.partition().evict(); // FIXME: evict gradually
} else {
_tracker.clear_continuity(*cache_i);
}
@@ -973,6 +966,10 @@ cache_entry::cache_entry(cache_entry&& o) noexcept
}
}
cache_entry::~cache_entry() {
_pe.evict();
}
void row_cache::set_schema(schema_ptr new_schema) noexcept {
_schema = std::move(new_schema);
}
@@ -991,7 +988,7 @@ streamed_mutation cache_entry::read(row_cache& rc, read_context& reader,
// Assumes reader is in the corresponding partition
streamed_mutation cache_entry::do_read(row_cache& rc, read_context& reader) {
auto snp = _pe.read(_schema, reader.phase());
auto snp = _pe.read(rc._tracker.region(), _schema, reader.phase());
auto ckr = query::clustering_key_filter_ranges::get_ranges(*_schema, reader.slice(), _key.key());
auto sm = make_cache_streamed_mutation(_schema, _key, std::move(ckr), rc, reader.shared_from_this(), std::move(snp));
if (reader.schema()->version() != _schema->version()) {

View File

@@ -124,6 +124,7 @@ public:
}
cache_entry(cache_entry&&) noexcept;
~cache_entry();
bool is_evictable() { return _lru_link.is_linked(); }
const dht::decorated_key& key() const { return _key; }
@@ -202,7 +203,6 @@ public:
uint64_t partition_evictions;
uint64_t partition_removals;
uint64_t partitions;
uint64_t modification_count;
uint64_t mispopulations;
uint64_t underlying_recreations;
uint64_t underlying_partition_skips;
@@ -240,7 +240,6 @@ public:
allocation_strategy& allocator();
logalloc::region& region();
const logalloc::region& region() const;
uint64_t modification_count() const { return _stats.modification_count; }
uint64_t partitions() const { return _stats.partitions; }
const stats& get_stats() const { return _stats; }
};
@@ -355,10 +354,7 @@ private:
previous_entry_pointer() = default; // Represents dht::ring_position_view::min()
previous_entry_pointer(dht::decorated_key key) : _key(std::move(key)) {};
// TODO: Currently inserting an entry to the cache increases
// modification counter. That doesn't seem to be necessary and if we
// didn't do that we could store iterator here to avoid key comparison
// (not to mention avoiding lookups in just_cache_scanning_reader.
// TODO: store iterator here to avoid key comparison
};
template<typename CreateEntry, typename VisitEntry>
@@ -505,6 +501,9 @@ public:
const cache_tracker& get_cache_tracker() const {
return _tracker;
}
cache_tracker& get_cache_tracker() {
return _tracker;
}
void set_schema(schema_ptr) noexcept;
const schema_ptr& schema() const;

View File

@@ -410,7 +410,7 @@ public:
circular_buffer<mutation_fragment> _buffer;
size_t _buffer_size = 0;
protected:
static constexpr size_t max_buffer_size_in_bytes = 8 * 1024;
size_t max_buffer_size_in_bytes = 8 * 1024;
schema_ptr _schema;
dht::decorated_key _key;
@@ -499,6 +499,10 @@ public:
future<mutation_fragment_opt> operator()() {
return _impl->operator()();
}
void set_max_buffer_size(size_t size) {
_impl->max_buffer_size_in_bytes = size;
}
};
// Adapts streamed_mutation to a streamed_mutation which is in forwarding mode.

View File

@@ -34,6 +34,7 @@ boost_tests = [
'types_test',
'keys_test',
'mutation_test',
'mvcc_test',
'schema_registry_test',
'range_test',
'mutation_reader_test',

View File

@@ -189,7 +189,7 @@ public:
return rc._read_section(rc._tracker.region(), [&] {
return with_linearized_managed_bytes([&] {
cache_entry& e = rc.find_or_create(dk, {}, rc.phase_of(dk));
return e.partition().read(e.schema());
return e.partition().read(rc._tracker.region(), e.schema());
});
});
}

View File

@@ -171,6 +171,11 @@ public:
return *this;
}
streamed_mutation_assertions& produces(const mutation& m) {
assert_that(mutation_from_streamed_mutation(_sm).get0()).is_equal_to(m);
return *this;
}
streamed_mutation_assertions& produces_only(const std::deque<mutation_fragment>& fragments) {
for (auto&& f : fragments) {
produces(f);

View File

@@ -1580,130 +1580,4 @@ SEASTAR_TEST_CASE(test_continuity_merging) {
assert_that(incomplete + incomplete).has_same_continuity(incomplete);
}
});
}
SEASTAR_TEST_CASE(test_apply_to_incomplete) {
return seastar::async([] {
simple_schema table;
auto&& s = *table.schema();
auto new_mutation = [&] {
return mutation(table.make_pkey(0), table.schema());
};
auto mutation_with_row = [&] (clustering_key ck) {
auto m = new_mutation();
table.add_row(m, ck, "v");
return m;
};
// FIXME: There is no assert_that() for mutation_partition
auto assert_equal = [&] (mutation_partition mp1, mutation_partition mp2) {
auto key = table.make_pkey(0);
assert_that(mutation(table.schema(), key, std::move(mp1)))
.is_equal_to(mutation(table.schema(), key, std::move(mp2)));
};
auto apply = [&] (partition_entry& e, const mutation& m) {
e.apply_to_incomplete(s, partition_entry(m.partition()), s);
};
auto ck1 = table.make_ckey(1);
auto ck2 = table.make_ckey(2);
BOOST_TEST_MESSAGE("Check that insert falling into discontinuous range is dropped");
{
auto e = partition_entry(mutation_partition::make_incomplete(s));
auto m = new_mutation();
table.add_row(m, ck1, "v");
apply(e, m);
assert_equal(e.squashed(s), mutation_partition::make_incomplete(s));
}
BOOST_TEST_MESSAGE("Check that continuity from latest version wins");
{
auto m1 = mutation_with_row(ck2);
auto e = partition_entry(m1.partition());
auto snap1 = e.read(table.schema());
auto m2 = mutation_with_row(ck2);
apply(e, m2);
partition_version* latest = &*e.version();
partition_version* prev = latest->next();
for (rows_entry& row : prev->partition().clustered_rows()) {
row.set_continuous(is_continuous::no);
}
auto m3 = mutation_with_row(ck1);
apply(e, m3);
assert_equal(e.squashed(s), (m2 + m3).partition());
// Check that snapshot data is not stolen when its entry is applied
auto e2 = partition_entry(mutation_partition(table.schema()));
e2.apply_to_incomplete(s, std::move(e), s);
assert_equal(snap1->squashed(), m1.partition());
assert_equal(e2.squashed(s), (m2 + m3).partition());
}
});
}
SEASTAR_TEST_CASE(test_schema_upgrade_preserves_continuity) {
return seastar::async([] {
simple_schema table;
auto new_mutation = [&] {
return mutation(table.make_pkey(0), table.schema());
};
auto mutation_with_row = [&] (clustering_key ck) {
auto m = new_mutation();
table.add_row(m, ck, "v");
return m;
};
// FIXME: There is no assert_that() for mutation_partition
auto assert_entry_equal = [&] (schema_ptr e_schema, partition_entry& e, mutation m) {
auto key = table.make_pkey(0);
assert_that(mutation(e_schema, key, e.squashed(*e_schema)))
.is_equal_to(m)
.has_same_continuity(m);
};
auto apply = [&] (schema_ptr e_schema, partition_entry& e, const mutation& m) {
e.apply_to_incomplete(*e_schema, partition_entry(m.partition()), *m.schema());
};
auto m1 = mutation_with_row(table.make_ckey(1));
m1.partition().clustered_rows().begin()->set_continuous(is_continuous::no);
m1.partition().set_static_row_continuous(false);
m1.partition().ensure_last_dummy(*m1.schema());
auto e = partition_entry(m1.partition());
auto rd1 = e.read(table.schema());
auto m2 = mutation_with_row(table.make_ckey(3));
m2.partition().ensure_last_dummy(*m2.schema());
apply(table.schema(), e, m2);
auto new_schema = schema_builder(table.schema()).with_column("__new_column", utf8_type).build();
e.upgrade(table.schema(), new_schema);
rd1 = {};
assert_entry_equal(new_schema, e, m1 + m2);
auto m3 = mutation_with_row(table.make_ckey(2));
apply(new_schema, e, m3);
auto m4 = mutation_with_row(table.make_ckey(0));
table.add_static_row(m4, "s_val");
apply(new_schema, e, m4);
assert_entry_equal(new_schema, e, m1 + m2 + m3);
});
}

256
tests/mvcc_test.cc Normal file
View File

@@ -0,0 +1,256 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/algorithm_ext/push_back.hpp>
#include <seastar/core/thread.hh>
#include "partition_version.hh"
#include "partition_snapshot_row_cursor.hh"
#include "disk-error-handler.hh"
#include "tests/test-utils.hh"
#include "tests/mutation_assertions.hh"
#include "tests/mutation_reader_assertions.hh"
#include "tests/simple_schema.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
using namespace std::chrono_literals;
SEASTAR_TEST_CASE(test_apply_to_incomplete) {
return seastar::async([] {
logalloc::region r;
simple_schema table;
auto&& s = *table.schema();
auto new_mutation = [&] {
return mutation(table.make_pkey(0), table.schema());
};
auto mutation_with_row = [&] (clustering_key ck) {
auto m = new_mutation();
table.add_row(m, ck, "v");
return m;
};
// FIXME: There is no assert_that() for mutation_partition
auto assert_equal = [&] (mutation_partition mp1, mutation_partition mp2) {
auto key = table.make_pkey(0);
assert_that(mutation(table.schema(), key, std::move(mp1)))
.is_equal_to(mutation(table.schema(), key, std::move(mp2)));
};
auto apply = [&] (partition_entry& e, const mutation& m) {
e.apply_to_incomplete(s, partition_entry(m.partition()), s);
};
auto ck1 = table.make_ckey(1);
auto ck2 = table.make_ckey(2);
BOOST_TEST_MESSAGE("Check that insert falling into discontinuous range is dropped");
with_allocator(r.allocator(), [&] {
logalloc::reclaim_lock l(r);
auto e = partition_entry(mutation_partition::make_incomplete(s));
auto m = new_mutation();
table.add_row(m, ck1, "v");
apply(e, m);
assert_equal(e.squashed(s), mutation_partition::make_incomplete(s));
});
BOOST_TEST_MESSAGE("Check that continuity from latest version wins");
with_allocator(r.allocator(), [&] {
logalloc::reclaim_lock l(r);
auto m1 = mutation_with_row(ck2);
auto e = partition_entry(m1.partition());
auto snap1 = e.read(r, table.schema());
auto m2 = mutation_with_row(ck2);
apply(e, m2);
partition_version* latest = &*e.version();
partition_version* prev = latest->next();
for (rows_entry& row : prev->partition().clustered_rows()) {
row.set_continuous(is_continuous::no);
}
auto m3 = mutation_with_row(ck1);
apply(e, m3);
assert_equal(e.squashed(s), (m2 + m3).partition());
// Check that snapshot data is not stolen when its entry is applied
auto e2 = partition_entry(mutation_partition(table.schema()));
e2.apply_to_incomplete(s, std::move(e), s);
assert_equal(snap1->squashed(), m1.partition());
assert_equal(e2.squashed(s), (m2 + m3).partition());
});
});
}
SEASTAR_TEST_CASE(test_schema_upgrade_preserves_continuity) {
return seastar::async([] {
logalloc::region r;
simple_schema table;
auto new_mutation = [&] {
return mutation(table.make_pkey(0), table.schema());
};
auto mutation_with_row = [&] (clustering_key ck) {
auto m = new_mutation();
table.add_row(m, ck, "v");
return m;
};
// FIXME: There is no assert_that() for mutation_partition
auto assert_entry_equal = [&] (schema_ptr e_schema, partition_entry& e, mutation m) {
auto key = table.make_pkey(0);
assert_that(mutation(e_schema, key, e.squashed(*e_schema)))
.is_equal_to(m)
.has_same_continuity(m);
};
auto apply = [&] (schema_ptr e_schema, partition_entry& e, const mutation& m) {
e.apply_to_incomplete(*e_schema, partition_entry(m.partition()), *m.schema());
};
with_allocator(r.allocator(), [&] {
logalloc::reclaim_lock l(r);
auto m1 = mutation_with_row(table.make_ckey(1));
m1.partition().clustered_rows().begin()->set_continuous(is_continuous::no);
m1.partition().set_static_row_continuous(false);
m1.partition().ensure_last_dummy(*m1.schema());
auto e = partition_entry(m1.partition());
auto rd1 = e.read(r, table.schema());
auto m2 = mutation_with_row(table.make_ckey(3));
m2.partition().ensure_last_dummy(*m2.schema());
apply(table.schema(), e, m2);
auto new_schema = schema_builder(table.schema()).with_column("__new_column", utf8_type).build();
e.upgrade(table.schema(), new_schema);
rd1 = {};
assert_entry_equal(new_schema, e, m1 + m2);
auto m3 = mutation_with_row(table.make_ckey(2));
apply(new_schema, e, m3);
auto m4 = mutation_with_row(table.make_ckey(0));
table.add_static_row(m4, "s_val");
apply(new_schema, e, m4);
assert_entry_equal(new_schema, e, m1 + m2 + m3);
});
});
}
SEASTAR_TEST_CASE(test_full_eviction_marks_affected_range_as_discontinuous) {
return seastar::async([] {
logalloc::region r;
with_allocator(r.allocator(), [&] {
logalloc::reclaim_lock l(r);
simple_schema table;
auto&& s = *table.schema();
auto ck1 = table.make_ckey(1);
auto ck2 = table.make_ckey(2);
auto e = partition_entry(mutation_partition(table.schema()));
auto t = table.new_tombstone();
auto&& p1 = e.open_version(s).partition();
p1.clustered_row(s, ck2);
p1.apply(t);
auto snap1 = e.read(r, table.schema());
auto&& p2 = e.open_version(s).partition();
p2.clustered_row(s, ck1);
auto snap2 = e.read(r, table.schema());
e.evict();
BOOST_REQUIRE(snap1->squashed().fully_discontinuous(s, position_range(
position_in_partition::before_all_clustered_rows(),
position_in_partition::after_key(ck2)
)));
BOOST_REQUIRE(snap2->squashed().fully_discontinuous(s, position_range(
position_in_partition::before_all_clustered_rows(),
position_in_partition::after_key(ck2)
)));
BOOST_REQUIRE(!snap1->squashed().static_row_continuous());
BOOST_REQUIRE(!snap2->squashed().static_row_continuous());
BOOST_REQUIRE_EQUAL(snap1->squashed().partition_tombstone(), t);
BOOST_REQUIRE_EQUAL(snap2->squashed().partition_tombstone(), t);
});
});
}
SEASTAR_TEST_CASE(test_eviction_with_active_reader) {
return seastar::async([] {
logalloc::region r;
with_allocator(r.allocator(), [&] {
simple_schema table;
auto&& s = *table.schema();
auto ck1 = table.make_ckey(1);
auto ck2 = table.make_ckey(2);
auto e = partition_entry(mutation_partition(table.schema()));
auto&& p1 = e.open_version(s).partition();
p1.clustered_row(s, ck2);
p1.ensure_last_dummy(s); // needed by partition_snapshot_row_cursor
auto snap1 = e.read(r, table.schema());
auto&& p2 = e.open_version(s).partition();
p2.clustered_row(s, ck1);
auto snap2 = e.read(r, table.schema());
partition_snapshot_row_cursor cursor(s, *snap2);
cursor.advance_to(position_in_partition_view::before_all_clustered_rows());
BOOST_REQUIRE(cursor.continuous());
BOOST_REQUIRE(cursor.key().equal(s, ck1));
e.evict();
cursor.maybe_refresh();
do {
BOOST_REQUIRE(!cursor.continuous());
BOOST_REQUIRE(cursor.dummy());
} while (cursor.next());
});
});
}

View File

@@ -1311,6 +1311,8 @@ SEASTAR_TEST_CASE(test_mvcc) {
memtable_snapshot_source underlying(s);
partition_key::equality eq(*s);
underlying.apply(m1);
cache_tracker tracker;
row_cache cache(s, snapshot_source([&] { return underlying(); }), tracker);
@@ -1767,3 +1769,164 @@ SEASTAR_TEST_CASE(test_tombstone_merging_in_partial_partition) {
}
});
}
static void consume_all(mutation_reader& rd) {
while (streamed_mutation_opt smo = rd().get0()) {
auto&& sm = *smo;
while (sm().get0()) ;
}
}
static void populate_range(row_cache& cache, query::clustering_range r = query::full_clustering_range) {
auto slice = partition_slice_builder(*cache.schema()).with_range(r).build();
auto rd = cache.make_reader(cache.schema(), query::full_partition_range, slice);
consume_all(rd);
}
SEASTAR_TEST_CASE(test_readers_get_all_data_after_eviction) {
return seastar::async([] {
simple_schema table;
schema_ptr s = table.schema();
memtable_snapshot_source underlying(s);
auto m1 = table.new_mutation("pk");
table.add_row(m1, table.make_ckey(3), "v3");
auto m2 = table.new_mutation("pk");
table.add_row(m2, table.make_ckey(1), "v1");
table.add_row(m2, table.make_ckey(2), "v2");
underlying.apply(m1);
cache_tracker tracker;
row_cache cache(s, snapshot_source([&] { return underlying(); }), tracker);
cache.populate(m1);
auto apply = [&] (mutation m) {
auto mt = make_lw_shared<memtable>(m.schema());
mt->apply(m);
cache.update([&] { underlying.apply(m); }, *mt).get();
};
auto make_sm = [&] (const query::partition_slice& slice = query::full_slice) {
auto rd = cache.make_reader(s, query::full_partition_range, slice);
auto smo = rd().get0();
BOOST_REQUIRE(smo);
streamed_mutation& sm = *smo;
sm.set_max_buffer_size(1);
return assert_that_stream(std::move(sm));
};
auto sm1 = make_sm();
apply(m2);
auto sm2 = make_sm();
auto slice_with_key2 = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(table.make_ckey(2)))
.build();
auto sm3 = make_sm(slice_with_key2);
cache.evict();
sm3.produces_row_with_key(table.make_ckey(2))
.produces_end_of_stream();
sm1.produces(m1);
sm2.produces(m1 + m2);
});
}
//
// Tests the following case of eviction and re-population:
//
// (Before) <ROW key=0> <RT1> <RT2> <RT3> <ROW key=8, cont=1>
// ^--- lower bound ^---- next row
//
// (After) <ROW key=0> <ROW key=8, cont=0> <ROW key=8, cont=1>
// ^--- lower bound ^---- next row
SEASTAR_TEST_CASE(test_tombstones_are_not_missed_when_range_is_invalidated) {
return seastar::async([] {
simple_schema s;
cache_tracker tracker;
memtable_snapshot_source underlying(s.schema());
auto pk = s.make_pkey(0);
auto pr = dht::partition_range::make_singular(pk);
mutation m1(pk, s.schema());
s.add_row(m1, s.make_ckey(0), "v0");
auto rt1 = s.make_range_tombstone(query::clustering_range::make(s.make_ckey(1), s.make_ckey(2)),
s.new_tombstone());
auto rt2 = s.make_range_tombstone(query::clustering_range::make(s.make_ckey(3), s.make_ckey(4)),
s.new_tombstone());
auto rt3 = s.make_range_tombstone(query::clustering_range::make(s.make_ckey(5), s.make_ckey(6)),
s.new_tombstone());
m1.partition().apply_delete(*s.schema(), rt1);
m1.partition().apply_delete(*s.schema(), rt2);
m1.partition().apply_delete(*s.schema(), rt3);
s.add_row(m1, s.make_ckey(8), "v8");
underlying.apply(m1);
row_cache cache(s.schema(), snapshot_source([&] { return underlying(); }), tracker);
auto make_sm = [&] (const query::partition_slice& slice = query::full_slice) {
auto rd = cache.make_reader(s.schema(), pr, slice);
auto smo = rd().get0();
BOOST_REQUIRE(smo);
streamed_mutation& sm = *smo;
sm.set_max_buffer_size(1);
return assert_that_stream(std::move(sm));
};
// populate using reader in same snapshot
{
populate_range(cache);
auto slice_after_7 = partition_slice_builder(*s.schema())
.with_range(query::clustering_range::make_starting_with(s.make_ckey(7)))
.build();
auto sma2 = make_sm(slice_after_7);
auto sma = make_sm();
sma.produces_row_with_key(s.make_ckey(0));
sma.produces_range_tombstone(rt1);
cache.evict();
sma2.produces_row_with_key(s.make_ckey(8));
sma2.produces_end_of_stream();
sma.produces_range_tombstone(rt2);
sma.produces_range_tombstone(rt3);
sma.produces_row_with_key(s.make_ckey(8));
sma.produces_end_of_stream();
}
// populate using reader created after invalidation
{
populate_range(cache);
auto sma = make_sm();
sma.produces_row_with_key(s.make_ckey(0));
sma.produces_range_tombstone(rt1);
mutation m2(pk, s.schema());
s.add_row(m2, s.make_ckey(7), "v7");
cache.invalidate([&] {
underlying.apply(m2);
}).get();
populate_range(cache, query::clustering_range::make_starting_with(s.make_ckey(5)));
sma.produces_range_tombstone(rt2);
sma.produces_range_tombstone(rt3);
sma.produces_row_with_key(s.make_ckey(8));
sma.produces_end_of_stream();
}
});
}

View File

@@ -43,6 +43,9 @@ public:
api::timestamp_type new_timestamp() {
return _timestamp++;
}
tombstone new_tombstone() {
return {new_timestamp(), gc_clock::now()};
}
public:
simple_schema()
: _s(schema_builder("ks", "cf")

View File

@@ -100,6 +100,7 @@ standard_migrator<T> standard_migrator<T>::object;
class allocation_strategy {
protected:
size_t _preferred_max_contiguous_allocation = std::numeric_limits<size_t>::max();
uint64_t _invalidate_counter = 1;
public:
using migrate_fn = const migrate_fn_type*;
@@ -154,6 +155,18 @@ public:
size_t preferred_max_contiguous_allocation() const {
return _preferred_max_contiguous_allocation;
}
// Returns a number which is increased when references to objects managed by this allocator
// are invalidated, e.g. due to internal events like compaction or eviction.
// When the value returned by this method doesn't change, references obtained
// between invocations remain valid.
uint64_t invalidate_counter() const {
return _invalidate_counter;
}
void invalidate_references() {
++_invalidate_counter;
}
};
class standard_allocation_strategy : public allocation_strategy {

View File

@@ -1078,7 +1078,6 @@ private:
bool _reclaiming_enabled = true;
bool _evictable = false;
uint64_t _id;
uint64_t _reclaim_counter = 0;
eviction_fn _eviction_fn;
region_group::region_heap::handle_type _heap_handle;
@@ -1183,7 +1182,7 @@ private:
}
void compact(segment* seg, segment_descriptor& desc) {
++_reclaim_counter;
++_invalidate_counter;
for_each_live(seg, [this] (const object_descriptor* desc, void* obj) {
auto size = desc->live_size(obj);
@@ -1406,7 +1405,7 @@ public:
// Make sure both regions will notice a future increment
// to the reclaim counter
_reclaim_counter = std::max(_reclaim_counter, other._reclaim_counter);
_invalidate_counter = std::max(_invalidate_counter, other._invalidate_counter);
}
// Returns occupancy of the sparsest compactible segment.
@@ -1434,7 +1433,7 @@ public:
}
void migrate_segment(segment* src, segment_descriptor& src_desc, segment* dst, segment_descriptor& dst_desc) {
++_reclaim_counter;
++_invalidate_counter;
size_t segment_size;
if (src != _active) {
_segment_descs.erase(src_desc);
@@ -1503,7 +1502,7 @@ public:
}
memory::reclaiming_result evict_some() {
++_reclaim_counter;
++_invalidate_counter;
return _eviction_fn();
}
@@ -1521,10 +1520,6 @@ public:
return _eviction_fn;
}
uint64_t reclaim_counter() const {
return _reclaim_counter;
}
friend class region;
friend class region_group;
friend class region_group::region_evictable_occupancy_ascending_less_comparator;
@@ -1644,6 +1639,10 @@ allocation_strategy& region::allocator() {
return *_impl;
}
const allocation_strategy& region::allocator() const {
return *_impl;
}
void region::set_reclaiming_enabled(bool compactible) {
_impl->set_reclaiming_enabled(compactible);
}
@@ -1652,10 +1651,6 @@ bool region::reclaiming_enabled() const {
return _impl->reclaiming_enabled();
}
uint64_t region::reclaim_counter() const {
return _impl->reclaim_counter();
}
std::ostream& operator<<(std::ostream& out, const occupancy_stats& stats) {
return out << sprint("%.2f%%, %d / %d [B]",
stats.used_fraction() * 100, stats.used_space(), stats.total_space());

View File

@@ -561,6 +561,7 @@ public:
occupancy_stats occupancy() const;
allocation_strategy& allocator();
const allocation_strategy& allocator() const;
region_group* group();
@@ -586,7 +587,9 @@ public:
// Returns a value which is increased when this region is either compacted or
// evicted from, which invalidates references into the region.
// When the value returned by this method doesn't change, references remain valid.
uint64_t reclaim_counter() const;
uint64_t reclaim_counter() const {
return allocator().invalidate_counter();
}
// Makes this region an evictable region. Supplied function will be called
// when data from this region needs to be evicted in order to reclaim space.