Merge ' mvcc: make schema upgrades gentle' from Michał Chojnowski

After a schema change, memtable and cache have to be upgraded to the new schema. Currently, they are upgraded (on the first access after a schema change) atomically, i.e. all rows of the entry are upgraded with one non-preemptible call. This is a one of the last vestiges of the times when partition were treated atomically, and it is a well known source of numerous large stalls.

This series makes schema upgrades gentle (preemptible). This is done by co-opting the existing MVCC machinery.
Before the series, all partition_versions in the partition_entry chain have the same schema, and an entry upgrade replaces the entire chain with a single squashed and upgraded version.
After the series, each partition_version has its own schema. A partition entry upgrade happens simply by adding an empty version with the new schema to the head of the chain. Row entries are upgraded to the current schema on-the-fly by the cursor during reads, and by the MVCC version merge ongoing in the background after the upgrade.

The series:
1. Does some code cleanup in the mutation_partition area.
2. Adds a schema field to partition_version and removes it from its containers (partition_snapshot, cache_entry, memtable_entry).
3. Adds upgrading variants of constructors and apply() for `row` and its wrappers.
4. Prepares partition_snapshot_row_cursor, mutation_partition_v2::apply_monotonically and partition_snapshot::merge_partition_versions for dealing with heterogeneous version chains.
5. Modifies partition_entry::upgrade to perform upgrades by extending the version chain with a new schema instead of squashing it to a single upgraded version.

Fixes #2577

Closes #13761

* github.com:scylladb/scylladb:
  test: mvcc_test: add a test for gentle schema upgrades
  partition_version: make partition_entry::upgrade() gentle
  partition_version: handle multi-schema snapshots in merge_partition_versions
  mutation_partition_v2: handle schema upgrades in apply_monotonically()
  partition_version: remove the unused "from" argument in partition_entry::upgrade()
  row_cache_test: prepare test_eviction_after_schema_change for gentle schema upgrades
  partition_version: handle multi-schema entries in partition_entry::squashed
  partition_snapshot_row_cursor: handle multi-schema snapshots
  partiton_version: prepare partition_snapshot::squashed() for multi-schema snapshots
  partition_version: prepare partition_snapshot::static_row() for multi-schema snapshots
  partition_version: add a logalloc::region argument to partition_entry::upgrade()
  memtable: propagate the region to memtable_entry::upgrade_schema()
  mutation_partition: add an upgrading variant of lazy_row::apply()
  mutation_partition: add an upgrading variant of rows_entry::rows_entry
  mutation_partition: switch an apply() call to apply_monotonically()
  mutation_partition: add an upgrading variant of rows_entry::apply_monotonically()
  mutation_fragment: add an upgrading variant of clustering_row::apply()
  mutation_partition: add an upgrading variant of row::row
  partition_version: remove _schema from partition_entry::operator<<
  partition_version: remove the schema argument from partition_entry::read()
  memtable: remove _schema from memtable_entry
  row_cache: remove _schema from cache_entry
  partition_version: remove the _schema field from partition_snapshot
  partition_version: add a _schema field to partition_version
  mutation_partition: change schema_ptr to schema& in mutation_partition::difference
  mutation_partition: change schema_ptr to schema& in mutation_partition constructor
  mutation_partition_v2: change schema_ptr to schema& in mutation_partition_v2 constructor
  mutation_partition: add upgrading variants of row::apply()
  partition_version: update the comment to apply_to_incomplete()
  mutation_partition_v2: clean up variants of apply()
  mutation_partition: remove apply_weak()
  mutation_partition_v2: remove a misleading comment in apply_monotonically()
  row_cache_test: add schema changes to test_concurrent_reads_and_eviction
  mutation_partition: fix mixed-schema apply()
This commit is contained in:
Tomasz Grabiec
2023-05-24 22:58:43 +02:00
20 changed files with 701 additions and 424 deletions

View File

@@ -487,7 +487,7 @@ mutation_partition& view_updates::partition_for(partition_key&& key) {
if (it != _updates.end()) {
return it->second;
}
return _updates.emplace(std::move(key), mutation_partition(_view)).first->second;
return _updates.emplace(std::move(key), mutation_partition(*_view)).first->second;
}
size_t view_updates::op_count() const {

View File

@@ -13,13 +13,13 @@
mutation::data::data(dht::decorated_key&& key, schema_ptr&& schema)
: _schema(std::move(schema))
, _dk(std::move(key))
, _p(_schema)
, _p(*_schema)
{ }
mutation::data::data(partition_key&& key_, schema_ptr&& schema)
: _schema(std::move(schema))
, _dk(dht::decorate_key(*_schema, std::move(key_)))
, _p(_schema)
, _p(*_schema)
{ }
mutation::data::data(schema_ptr&& schema, dht::decorated_key&& key, const mutation_partition& mp)

View File

@@ -90,6 +90,9 @@ public:
void apply(const schema& s, const deletable_row& r) {
_row.apply(s, r);
}
void apply(const schema& our_schema, const schema& their_schema, const deletable_row& r) {
_row.apply(our_schema, their_schema, r);
}
position_in_partition_view position() const;

View File

@@ -243,21 +243,6 @@ void mutation_partition::ensure_last_dummy(const schema& s) {
}
}
void mutation_partition::apply(const schema& s, const mutation_partition& p, const schema& p_schema,
mutation_application_stats& app_stats) {
apply_weak(s, p, p_schema, app_stats);
}
void mutation_partition::apply(const schema& s, mutation_partition&& p,
mutation_application_stats& app_stats) {
apply_weak(s, std::move(p), app_stats);
}
void mutation_partition::apply(const schema& s, mutation_partition_view p, const schema& p_schema,
mutation_application_stats& app_stats) {
apply_weak(s, p, p_schema, app_stats);
}
struct mutation_fragment_applier {
const schema& _s;
mutation_partition& _mp;
@@ -489,7 +474,7 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
if (s.version() == p_schema.version()) {
return apply_monotonically(s, std::move(p), no_cache_tracker, app_stats, preemptible, res);
} else {
mutation_partition p2(s, p);
mutation_partition p2(p_schema, p);
p2.upgrade(p_schema, s);
return apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::no, res); // FIXME: make preemptible
}
@@ -508,7 +493,7 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
}
void
mutation_partition::apply_weak(const schema& s, mutation_partition_view p,
mutation_partition::apply(const schema& s, mutation_partition_view p,
const schema& p_schema, mutation_application_stats& app_stats) {
// FIXME: Optimize
mutation_partition p2(*this, copy_comparators_only{});
@@ -517,13 +502,13 @@ mutation_partition::apply_weak(const schema& s, mutation_partition_view p,
apply_monotonically(s, std::move(p2), p_schema, app_stats);
}
void mutation_partition::apply_weak(const schema& s, const mutation_partition& p,
void mutation_partition::apply(const schema& s, const mutation_partition& p,
const schema& p_schema, mutation_application_stats& app_stats) {
// FIXME: Optimize
apply_monotonically(s, mutation_partition(s, p), p_schema, app_stats);
apply_monotonically(s, mutation_partition(p_schema, p), p_schema, app_stats);
}
void mutation_partition::apply_weak(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
void mutation_partition::apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
apply_monotonically(s, std::move(p), no_cache_tracker, app_stats);
}
@@ -1161,22 +1146,42 @@ deletable_row::equal(column_kind kind, const schema& s, const deletable_row& oth
return _cells.equal(kind, s, other._cells, other_schema);
}
void deletable_row::apply(const schema& s, const deletable_row& src) {
apply_monotonically(s, src);
}
void deletable_row::apply(const schema& s, deletable_row&& src) {
apply_monotonically(s, std::move(src));
}
void deletable_row::apply_monotonically(const schema& s, const deletable_row& src) {
_cells.apply(s, column_kind::regular_column, src._cells);
void deletable_row::apply(const schema& s, const deletable_row& src) {
apply_monotonically(s, src);
}
void deletable_row::apply(const schema& our_schema, const schema& their_schema, deletable_row&& src) {
apply_monotonically(our_schema, their_schema, std::move(src));
}
void deletable_row::apply(const schema& our_schema, const schema& their_schema, const deletable_row& src) {
apply_monotonically(our_schema, their_schema, src);
}
void deletable_row::apply_monotonically(const schema& s, deletable_row&& src) {
_cells.apply_monotonically(s, column_kind::regular_column, std::move(src._cells));
_marker.apply(src._marker);
_deleted_at.apply(src._deleted_at, _marker);
}
void deletable_row::apply_monotonically(const schema& s, deletable_row&& src) {
_cells.apply(s, column_kind::regular_column, std::move(src._cells));
void deletable_row::apply_monotonically(const schema& s, const deletable_row& src) {
_cells.apply_monotonically(s, column_kind::regular_column, src._cells);
_marker.apply(src._marker);
_deleted_at.apply(src._deleted_at, _marker);
}
void deletable_row::apply_monotonically(const schema& our_schema, const schema& their_schema, deletable_row&& src) {
_cells.apply_monotonically(our_schema, their_schema, column_kind::regular_column, std::move(src._cells));
_marker.apply(src._marker);
_deleted_at.apply(src._deleted_at, _marker);
}
void deletable_row::apply_monotonically(const schema& our_schema, const schema& their_schema, const deletable_row& src) {
_cells.apply_monotonically(our_schema, their_schema, column_kind::regular_column, src._cells);
_marker.apply(src._marker);
_deleted_at.apply(src._deleted_at, _marker);
}
@@ -1576,6 +1581,16 @@ row::row(const schema& s, column_kind kind, const row& o) : _size(o._size)
_cells.clone_from(o._cells, clone_cell_and_hash);
}
row row::construct(const schema& our_schema, const schema& their_schema, column_kind kind, const row& o) {
if (our_schema.version() == their_schema.version()) {
return row(our_schema, kind, o);
} else {
row r;
r.apply(our_schema, their_schema, kind, o);
return r;
}
}
row::~row() {
}
@@ -1640,7 +1655,32 @@ row& row::operator=(row&& other) noexcept {
return *this;
}
void row::apply(const schema& s, column_kind kind, row&& other) {
apply_monotonically(s, kind, std::move(other));
}
void row::apply(const schema& s, column_kind kind, const row& other) {
apply_monotonically(s, kind, other);
}
void row::apply(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other) {
apply_monotonically(our_schema, their_schema, kind, std::move(other));
};
void row::apply(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other) {
apply_monotonically(our_schema, their_schema, kind, other);
};
void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
if (other.empty()) {
return;
}
other.consume_with([&] (column_id id, cell_and_hash& c_a_h) {
apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash));
});
}
void row::apply_monotonically(const schema& s, column_kind kind, const row& other) {
if (other.empty()) {
return;
}
@@ -1649,16 +1689,29 @@ void row::apply(const schema& s, column_kind kind, const row& other) {
});
}
void row::apply(const schema& s, column_kind kind, row&& other) {
apply_monotonically(s, kind, std::move(other));
}
void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
if (other.empty()) {
return;
void row::apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other) {
if (our_schema.version() == their_schema.version()) {
return apply_monotonically(our_schema, kind, std::move(other));
}
other.consume_with([&] (column_id id, cell_and_hash& c_a_h) {
apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash));
const column_definition& their_col = their_schema.column_at(kind, id);
const column_definition* our_col = our_schema.get_column_definition(their_col.name());
if (our_col) {
converting_mutation_partition_applier::append_cell(*this, kind, *our_col, their_col, c_a_h.cell);
}
});
}
void row::apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other) {
if (our_schema.version() == their_schema.version()) {
return apply_monotonically(our_schema, kind, other);
}
other.for_each_cell([&] (column_id id, const cell_and_hash& c_a_h) {
const column_definition& their_col = their_schema.column_at(kind, id);
const column_definition* our_col = our_schema.get_column_definition(their_col.name());
if (our_col) {
converting_mutation_partition_applier::append_cell(*this, kind, *our_col, their_col, c_a_h.cell);
}
});
}
@@ -1873,19 +1926,19 @@ bool row_marker::compact_and_expire(tombstone tomb, gc_clock::time_point now,
return !is_missing() && _ttl != dead;
}
mutation_partition mutation_partition::difference(schema_ptr s, const mutation_partition& other) const
mutation_partition mutation_partition::difference(const schema& s, const mutation_partition& other) const
{
check_schema(*s);
check_schema(s);
mutation_partition mp(s);
if (_tombstone > other._tombstone) {
mp.apply(_tombstone);
}
mp._static_row = _static_row.difference(*s, column_kind::static_column, other._static_row);
mp._static_row = _static_row.difference(s, column_kind::static_column, other._static_row);
mp._row_tombstones = _row_tombstones.difference(*s, other._row_tombstones);
mp._row_tombstones = _row_tombstones.difference(s, other._row_tombstones);
auto it_r = other._rows.begin();
rows_entry::compare cmp_r(*s);
rows_entry::compare cmp_r(s);
for (auto&& r : _rows) {
if (r.dummy()) {
continue;
@@ -1893,12 +1946,12 @@ mutation_partition mutation_partition::difference(schema_ptr s, const mutation_p
while (it_r != other._rows.end() && (it_r->dummy() || cmp_r(*it_r, r))) {
++it_r;
}
if (it_r == other._rows.end() || !it_r->key().equal(*s, r.key())) {
mp.insert_row(*s, r.key(), r.row());
if (it_r == other._rows.end() || !it_r->key().equal(s, r.key())) {
mp.insert_row(s, r.key(), r.row());
} else {
auto dr = r.row().difference(*s, column_kind::regular_column, it_r->row());
auto dr = r.row().difference(s, column_kind::regular_column, it_r->row());
if (!dr.empty()) {
mp.insert_row(*s, r.key(), std::move(dr));
mp.insert_row(s, r.key(), std::move(dr));
}
}
}
@@ -1936,7 +1989,7 @@ void mutation_partition::accept(const schema& s, mutation_partition_visitor& v)
void
mutation_partition::upgrade(const schema& old_schema, const schema& new_schema) {
// We need to copy to provide strong exception guarantees.
mutation_partition tmp(new_schema.shared_from_this());
mutation_partition tmp(new_schema);
tmp.set_static_row_continuous(_static_row_continuous);
converting_mutation_partition_applier v(old_schema.get_column_mapping(), new_schema, tmp);
accept(old_schema, v);

View File

@@ -97,6 +97,7 @@ public:
row();
~row();
row(const schema&, column_kind, const row&);
static row construct(const schema& our_schema, const schema& their_schema, column_kind, const row&);
row(row&& other) noexcept;
row& operator=(row&& other) noexcept;
size_t size() const { return _size; }
@@ -178,10 +179,15 @@ public:
// Weak exception guarantees
void apply(const schema&, column_kind, const row& src);
// Weak exception guarantees
void apply(const schema&, column_kind, row&& src);
void apply(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other);
void apply(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other);
// Monotonic exception guarantees
void apply_monotonically(const schema&, column_kind, row&& src);
void apply_monotonically(const schema&, column_kind, const row& src);
void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind, row&& src);
void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind, const row& src);
// Expires cells based on query_time. Expires tombstones based on gc_before
// and max_purgeable. Removes cells covered by tomb.
@@ -416,6 +422,14 @@ public:
get_existing().apply_monotonically(s, kind, std::move(src.get_existing()));
}
// Monotonic exception guarantees
void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, lazy_row&& src) {
if (src.empty()) {
return;
}
maybe_create().apply_monotonically(our_schema, their_schema, kind, std::move(src.get_existing()));
}
// Expires cells based on query_time. Expires tombstones based on gc_before
// and max_purgeable. Removes cells covered by tomb.
// Returns true iff there are any live cells left.
@@ -820,6 +834,11 @@ public:
, _marker(other._marker)
, _cells(s, column_kind::regular_column, other._cells)
{ }
deletable_row(const schema& our_schema, const schema& their_schema, const deletable_row& other)
: _deleted_at(other._deleted_at)
, _marker(other._marker)
, _cells(row::construct(our_schema, their_schema, column_kind::regular_column, other._cells))
{ }
deletable_row(row_tombstone&& tomb, row_marker&& marker, row&& cells)
: _deleted_at(std::move(tomb)), _marker(std::move(marker)), _cells(std::move(cells))
{}
@@ -852,10 +871,15 @@ public:
// Weak exception guarantees. After exception, both src and this will commute to the same value as
// they would should the exception not happen.
void apply(const schema& s, const deletable_row& src);
void apply(const schema& s, deletable_row&& src);
void apply_monotonically(const schema& s, const deletable_row& src);
void apply_monotonically(const schema& s, deletable_row&& src);
void apply(const schema&, deletable_row&& src);
void apply(const schema&, const deletable_row& src);
void apply(const schema& our_schema, const schema& their_schema, const deletable_row& src);
void apply(const schema& our_schema, const schema& their_schema, deletable_row&& src);
void apply_monotonically(const schema&, deletable_row&& src);
void apply_monotonically(const schema&, const deletable_row& src);
void apply_monotonically(const schema& our_schema, const schema& their_schema, deletable_row&& src);
void apply_monotonically(const schema& our_schema, const schema& their_schema, const deletable_row& src);
public:
row_tombstone deleted_at() const { return _deleted_at; }
api::timestamp_type created_at() const { return _marker.timestamp(); }
@@ -956,6 +980,12 @@ public:
, _range_tombstone(e._range_tombstone)
, _flags(e._flags)
{ }
rows_entry(const schema& our_schema, const schema& their_schema, const rows_entry& e)
: _key(e._key)
, _row(our_schema, their_schema, e._row)
, _range_tombstone(e._range_tombstone)
, _flags(e._flags)
{ }
// Valid only if !dummy()
clustering_key& key() {
return _key;
@@ -989,7 +1019,10 @@ public:
_row.apply(t);
}
void apply_monotonically(const schema& s, rows_entry&& e) {
_row.apply(s, std::move(e._row));
_row.apply_monotonically(s, std::move(e._row));
}
void apply_monotonically(const schema& our_schema, const schema& their_schema, rows_entry&& e) {
_row.apply_monotonically(our_schema, their_schema, std::move(e._row));
}
bool empty() const {
return _row.empty();
@@ -1193,11 +1226,11 @@ public:
static mutation_partition make_incomplete(const schema& s, tombstone t = {}) {
return mutation_partition(incomplete_tag(), s, t);
}
mutation_partition(schema_ptr s)
mutation_partition(const schema& s)
: _rows()
, _row_tombstones(*s)
, _row_tombstones(s)
#ifdef SEASTAR_DEBUG
, _schema_version(s->version())
, _schema_version(s.version())
#endif
{ }
mutation_partition(mutation_partition& other, copy_comparators_only)
@@ -1279,14 +1312,14 @@ public:
// is not representable in this_schema is dropped, thus apply() loses commutativity.
//
// Weak exception guarantees.
// Assumes this and p are not owned by a cache_tracker.
void apply(const schema& this_schema, const mutation_partition& p, const schema& p_schema,
mutation_application_stats& app_stats);
// Use in case this instance and p share the same schema.
// Same guarantees as apply(const schema&, mutation_partition&&, const schema&);
void apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats);
// Same guarantees and constraints as for apply(const schema&, const mutation_partition&, const schema&).
void apply(const schema& this_schema, mutation_partition_view p, const schema& p_schema,
mutation_application_stats& app_stats);
// Use in case this instance and p share the same schema.
// Same guarantees and constraints as for other variants of apply().
void apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats);
// Applies p to this instance.
//
@@ -1321,15 +1354,6 @@ public:
stop_iteration apply_monotonically(const schema& s, mutation_partition&& p, const schema& p_schema,
mutation_application_stats& app_stats);
// Weak exception guarantees.
// Assumes this and p are not owned by a cache_tracker.
void apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema,
mutation_application_stats& app_stats);
void apply_weak(const schema& s, mutation_partition&&,
mutation_application_stats& app_stats);
void apply_weak(const schema& s, mutation_partition_view p, const schema& p_schema,
mutation_application_stats& app_stats);
// Converts partition to the new schema. When succeeds the partition should only be accessed
// using the new schema.
//
@@ -1397,7 +1421,7 @@ public:
// Returns the minimal mutation_partition that when applied to "other" will
// create a mutation_partition equal to the sum of other and this one.
// This and other must both be governed by the same schema s.
mutation_partition difference(schema_ptr s, const mutation_partition& other) const;
mutation_partition difference(const schema& s, const mutation_partition& other) const;
// Returns a subset of this mutation holding only information relevant for given clustering ranges.
// Range tombstones will be trimmed to the boundaries of the clustering ranges.

View File

@@ -66,7 +66,7 @@ mutation_partition_v2::mutation_partition_v2(const schema& s, mutation_partition
auto&& tombstones = x.mutable_row_tombstones();
if (!tombstones.empty()) {
try {
mutation_partition_v2 p(s.shared_from_this());
mutation_partition_v2 p(s);
for (auto&& t: tombstones) {
range_tombstone & rt = t.tombstone();
@@ -75,8 +75,7 @@ mutation_partition_v2::mutation_partition_v2(const schema& s, mutation_partition
.set_range_tombstone(rt.tomb);
}
mutation_application_stats app_stats;
apply_monotonically(s, std::move(p), s, app_stats);
apply(s, std::move(p));
} catch (...) {
_rows.clear_and_dispose(current_deleter<rows_entry>());
throw;
@@ -118,20 +117,28 @@ struct fmt::formatter<apply_resume> : fmt::formatter<std::string_view> {
}
};
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker,
mutation_application_stats& app_stats, is_preemptible preemptible, apply_resume& res, is_evictable evictable) {
return apply_monotonically(s, std::move(p), tracker, app_stats,
preemptible ? default_preemption_check() : never_preempt(), res, evictable);
void mutation_partition_v2::apply(const schema& s, mutation_partition&& p) {
apply(s, mutation_partition_v2(s, std::move(p)));
}
void mutation_partition_v2::apply(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker, is_evictable evictable) {
mutation_application_stats app_stats;
apply_resume res;
apply_monotonically(s, s, std::move(p), tracker, app_stats, never_preempt(), res, evictable);
}
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker,
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, const schema& p_s, mutation_partition_v2&& p, cache_tracker* tracker,
mutation_application_stats& app_stats, preemption_check need_preempt, apply_resume& res, is_evictable evictable) {
#ifdef SEASTAR_DEBUG
assert(s.version() == _schema_version);
assert(p._schema_version == _schema_version);
assert(_schema_version == s.version());
assert(p._schema_version == p_s.version());
#endif
bool same_schema = s.version() == p_s.version();
_tombstone.apply(p._tombstone);
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
if (same_schema) [[likely]] {
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
} else {
_static_row.apply_monotonically(s, p_s, column_kind::static_column, std::move(p._static_row));
}
_static_row_continuous |= p._static_row_continuous;
rows_entry::tri_compare cmp(s);
@@ -215,7 +222,6 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
alloc_strategy_unique_ptr<rows_entry> p_sentinel;
alloc_strategy_unique_ptr<rows_entry> this_sentinel;
auto insert_sentinel_back = defer([&] {
// Insert this_sentinel before sentinel so that the former lands before the latter in LRU.
if (this_sentinel) {
assert(p_i != p._rows.end());
auto rt = this_sentinel->range_tombstone();
@@ -307,8 +313,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
if (need_preempt()) {
auto s1 = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s,
position_in_partition::after_key(s, lb_i->position()), is_dummy::yes, is_continuous::no));
current_allocator().construct<rows_entry>(p_s,
position_in_partition::after_key(p_s, lb_i->position()), is_dummy::yes, is_continuous::no));
alloc_strategy_unique_ptr<rows_entry> s2;
if (lb_i->position().is_clustering_row()) {
s2 = alloc_strategy_unique_ptr<rows_entry>(
@@ -347,8 +353,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
if (next_interval_loaded) {
// FIXME: Avoid reallocation
s1 = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s,
position_in_partition::after_key(s, src_e.position()), is_dummy::yes, is_continuous::no));
current_allocator().construct<rows_entry>(p_s,
position_in_partition::after_key(p_s, src_e.position()), is_dummy::yes, is_continuous::no));
if (src_e.position().is_clustering_row()) {
s2 = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s,
@@ -361,31 +367,41 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
}
}
rows_type::key_grabber pi_kg(p_i);
lb_i = _rows.insert_before(i, std::move(pi_kg));
if (same_schema) [[likely]] {
rows_type::key_grabber pi_kg(p_i);
lb_i = _rows.insert_before(i, std::move(pi_kg));
} else {
// FIXME: avoid cell reallocation.
// We are copying the row to make exception safety simpler,
// but it's not inherently necessary and could be avoided.
auto new_e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(s, p_s, src_e));
lb_i = _rows.insert_before(i, std::move(new_e));
lb_i->swap(src_e);
p_i = p._rows.erase_and_dispose(p_i, del);
}
p_sentinel = std::move(s1);
this_sentinel = std::move(s2);
// Check if src_e falls into a continuous range.
// Check if src_e (now: lb_i) fell into a continuous range.
// The range past the last entry is also always implicitly continuous.
if (i == _rows.end() || i->continuous()) {
tombstone i_rt = i != _rows.end() ? i->range_tombstone() : tombstone();
// Cannot apply only-row range tombstone falling into a continuous range without inserting extra entry.
// Should not occur in practice due to the "older versions are evicted first" rule.
// Never occurs in non-evictable snapshots because they are continuous.
if (!src_e.continuous() && src_e.range_tombstone() > i_rt) {
if (src_e.dummy()) {
if (!lb_i->continuous() && lb_i->range_tombstone() > i_rt) {
if (lb_i->dummy()) {
lb_i->set_range_tombstone(i_rt);
} else {
position_in_partition_view i_pos = i != _rows.end() ? i->position()
: position_in_partition_view::after_all_clustered_rows();
// See the "no singular tombstones" rule.
mplog.error("Cannot merge entry {} with rt={}, cont=0 into continuous range before {} with rt={}",
src_e.position(), src_e.range_tombstone(), i_pos, i_rt);
lb_i->position(), lb_i->range_tombstone(), i_pos, i_rt);
abort();
}
} else {
lb_i->set_range_tombstone(src_e.range_tombstone() + i_rt);
lb_i->set_range_tombstone(lb_i->range_tombstone() + i_rt);
}
lb_i->set_continuous(true);
}
@@ -397,8 +413,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
if (next_interval_loaded) {
// FIXME: Avoid reallocation
s1 = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s,
position_in_partition::after_key(s, src_e.position()), is_dummy::yes, is_continuous::no));
current_allocator().construct<rows_entry>(p_s,
position_in_partition::after_key(p_s, src_e.position()), is_dummy::yes, is_continuous::no));
if (src_e.position().is_clustering_row()) {
s2 = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, s1->position(), is_dummy::yes, is_continuous::yes));
@@ -436,18 +452,26 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
}
}
if (tracker) {
// Newer evictable versions store complete rows
i->row() = std::move(src_e.row());
// Need to preserve the LRU link of the later version in case it's
// the last dummy entry which holds the partition entry linked in LRU.
i->swap(src_e);
if (same_schema) [[likely]] {
// Newer evictable versions store complete rows
i->row() = std::move(src_e.row());
// Need to preserve the LRU link of the later version in case it's
// the last dummy entry which holds the partition entry linked in LRU.
i->swap(src_e);
} else {
i->apply_monotonically(s, p_s, std::move(src_e));
}
tracker->remove(src_e);
} else {
// Avoid row compaction if no newer range tombstone.
do_compact = (src_e.range_tombstone() + src_e.row().deleted_at().regular()) >
(i->range_tombstone() + i->row().deleted_at().regular());
memory::on_alloc_point();
i->apply_monotonically(s, std::move(src_e));
if (same_schema) [[likely]] {
i->apply_monotonically(s, std::move(src_e));
} else {
i->apply_monotonically(s, p_s, std::move(src_e));
}
}
++app_stats.row_hits;
p_i = p._rows.erase_and_dispose(p_i, del);
@@ -491,59 +515,6 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
return stop_iteration::yes;
}
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
mutation_application_stats& app_stats, is_preemptible preemptible, apply_resume& res, is_evictable evictable) {
if (s.version() == p_schema.version()) {
return apply_monotonically(s, std::move(p), no_cache_tracker, app_stats,
preemptible ? default_preemption_check() : never_preempt(), res, evictable);
} else {
mutation_partition_v2 p2(s, p);
p2.upgrade(p_schema, s);
return apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, never_preempt(), res, evictable); // FIXME: make preemptible
}
}
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker *tracker,
mutation_application_stats& app_stats, is_evictable evictable) {
apply_resume res;
return apply_monotonically(s, std::move(p), tracker, app_stats, is_preemptible::no, res, evictable);
}
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
mutation_application_stats& app_stats) {
apply_resume res;
return apply_monotonically(s, std::move(p), p_schema, app_stats, is_preemptible::no, res, is_evictable::no);
}
void mutation_partition_v2::apply(const schema& s, const mutation_partition_v2& p, const schema& p_schema,
mutation_application_stats& app_stats) {
apply_monotonically(s, mutation_partition_v2(p_schema, std::move(p)), p_schema, app_stats);
}
void mutation_partition_v2::apply(const schema& s, mutation_partition_v2&& p, mutation_application_stats& app_stats) {
apply_monotonically(s, mutation_partition_v2(s, std::move(p)), no_cache_tracker, app_stats, is_evictable::no);
}
void
mutation_partition_v2::apply_weak(const schema& s, mutation_partition_view p,
const schema& p_schema, mutation_application_stats& app_stats) {
// FIXME: Optimize
mutation_partition p2(p_schema.shared_from_this());
partition_builder b(p_schema, p2);
p.accept(p_schema, b);
apply_monotonically(s, mutation_partition_v2(p_schema, std::move(p2)), p_schema, app_stats);
}
void mutation_partition_v2::apply_weak(const schema& s, const mutation_partition& p,
const schema& p_schema, mutation_application_stats& app_stats) {
// FIXME: Optimize
apply_monotonically(s, mutation_partition_v2(s, p), p_schema, app_stats);
}
void mutation_partition_v2::apply_weak(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
apply_monotonically(s, mutation_partition_v2(s, std::move(p)), no_cache_tracker, app_stats, is_evictable::no);
}
void
mutation_partition_v2::apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t) {
check_schema(schema);
@@ -555,10 +526,9 @@ mutation_partition_v2::apply_row_tombstone(const schema& schema, clustering_key_
void
mutation_partition_v2::apply_row_tombstone(const schema& schema, range_tombstone rt) {
check_schema(schema);
mutation_partition mp(schema.shared_from_this());
mutation_partition mp(schema);
mp.apply_row_tombstone(schema, std::move(rt));
mutation_application_stats stats;
apply_weak(schema, std::move(mp), stats);
apply(schema, std::move(mp));
}
void
@@ -965,7 +935,7 @@ void mutation_partition_v2::accept(const schema& s, mutation_partition_visitor&
void
mutation_partition_v2::upgrade(const schema& old_schema, const schema& new_schema) {
// We need to copy to provide strong exception guarantees.
mutation_partition tmp(new_schema.shared_from_this());
mutation_partition tmp(new_schema);
tmp.set_static_row_continuous(_static_row_continuous);
converting_mutation_partition_applier v(old_schema.get_column_mapping(), new_schema, tmp);
accept(old_schema, v);
@@ -973,7 +943,7 @@ mutation_partition_v2::upgrade(const schema& old_schema, const schema& new_schem
}
mutation_partition mutation_partition_v2::as_mutation_partition(const schema& s) const {
mutation_partition tmp(s.shared_from_this());
mutation_partition tmp(s);
tmp.set_static_row_continuous(_static_row_continuous);
partition_builder v(s, tmp);
accept(s, v);

View File

@@ -89,10 +89,10 @@ public:
static mutation_partition_v2 make_incomplete(const schema& s, tombstone t = {}) {
return mutation_partition_v2(incomplete_tag(), s, t);
}
mutation_partition_v2(schema_ptr s)
mutation_partition_v2(const schema& s)
: _rows()
#ifdef SEASTAR_DEBUG
, _schema_version(s->version())
, _schema_version(s.version())
#endif
{ }
mutation_partition_v2(mutation_partition_v2& other, copy_comparators_only)
@@ -167,18 +167,11 @@ public:
// prefix must not be full
void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t);
void apply_row_tombstone(const schema& schema, range_tombstone rt);
//
// Applies p to current object.
//
// Commutative when this_schema == p_schema. If schemas differ, data in p which
// is not representable in this_schema is dropped, thus apply() loses commutativity.
//
// Weak exception guarantees.
void apply(const schema& this_schema, const mutation_partition_v2& p, const schema& p_schema,
mutation_application_stats& app_stats);
// Use in case this instance and p share the same schema.
// Same guarantees as apply(const schema&, mutation_partition_v2&&, const schema&);
void apply(const schema& s, mutation_partition_v2&& p, mutation_application_stats& app_stats);
// Assumes this and p are not owned by a cache_tracker and non-evictable.
void apply(const schema& s, mutation_partition&&);
void apply(const schema& s, mutation_partition_v2&& p, cache_tracker* = nullptr, is_evictable evictable = is_evictable::no);
// Applies p to this instance.
//
@@ -204,25 +197,8 @@ public:
//
// If is_preemptible::yes is passed, apply_resume must also be passed,
// same instance each time until stop_iteration::yes is returned.
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker*,
mutation_application_stats& app_stats, is_preemptible, apply_resume&, is_evictable);
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
mutation_application_stats& app_stats, is_preemptible, apply_resume&, is_evictable);
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker,
mutation_application_stats& app_stats, is_evictable);
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
mutation_application_stats& app_stats);
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker*,
mutation_application_stats& app_stats, preemption_check, apply_resume&, is_evictable);
// Weak exception guarantees.
// Assumes this and p are not owned by a cache_tracker and non-evictable.
void apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema,
mutation_application_stats& app_stats);
void apply_weak(const schema& s, mutation_partition&&,
mutation_application_stats& app_stats);
void apply_weak(const schema& s, mutation_partition_view p, const schema& p_schema,
mutation_application_stats& app_stats);
stop_iteration apply_monotonically(const schema& this_schema, const schema& p_schema, mutation_partition_v2&& p,
cache_tracker*, mutation_application_stats& app_stats, preemption_check, apply_resume&, is_evictable);
// Converts partition to the new schema. When succeeds the partition should only be accessed
// using the new schema.

View File

@@ -34,6 +34,8 @@ static void remove_or_mark_as_unique_owner(partition_version* current, mutation_
partition_version::partition_version(partition_version&& pv) noexcept
: anchorless_list_base_hook(std::move(pv))
, _backref(pv._backref)
, _schema(std::move(pv._schema))
, _is_being_upgraded(pv._is_being_upgraded)
, _partition(std::move(pv._partition))
{
if (_backref) {
@@ -56,15 +58,20 @@ partition_version::~partition_version()
if (_backref) {
_backref->_version = nullptr;
}
with_allocator(standard_allocator(), [&] {
// Destroying the schema_ptr can cause a destruction of the schema,
// so it has to happen in the allocator which schemas are allocated in.
_schema = nullptr;
});
}
stop_iteration partition_version::clear_gently(cache_tracker* tracker) noexcept {
return _partition.clear_gently(tracker);
}
size_t partition_version::size_in_allocator(const schema& s, allocation_strategy& allocator) const {
size_t partition_version::size_in_allocator(allocation_strategy& allocator) const {
return allocator.object_memory_size_in_allocator(this) +
partition().external_memory_usage(s);
partition().external_memory_usage(*_schema);
}
namespace {
@@ -111,15 +118,20 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduc
}
::static_row partition_snapshot::static_row(bool digest_requested) const {
return ::static_row(::squashed<row>(version(),
[&] (const mutation_partition_v2& mp) -> const row& {
if (digest_requested) {
mp.static_row().prepare_hash(*_schema, column_kind::static_column);
}
return mp.static_row().get();
},
[this] (const row& r) { return row(*_schema, column_kind::static_column, r); },
[this] (row& a, const row& b) { a.apply(*_schema, column_kind::static_column, b); }));
const partition_version* this_v = &*version();
partition_version* it = this_v->last();
if (digest_requested) {
it->partition().static_row().prepare_hash(*it->get_schema(), column_kind::static_column);
}
row r = row::construct(*this_v->get_schema(), *it->get_schema(), column_kind::static_column, it->partition().static_row().get());
while (it != this_v) {
it = it->prev();
if (digest_requested) {
it->partition().static_row().prepare_hash(*it->get_schema(), column_kind::static_column);
}
r.apply(*this_v->get_schema(), *it->get_schema(), column_kind::static_column, it->partition().static_row().get());
}
return ::static_row(std::move(r));
}
bool partition_snapshot::static_row_continuous() const {
@@ -133,15 +145,16 @@ tombstone partition_snapshot::partition_tombstone() const {
}
mutation_partition partition_snapshot::squashed() const {
return ::squashed<mutation_partition>(version(),
[this] (const mutation_partition_v2& mp) -> mutation_partition {
return mp.as_mutation_partition(*_schema);
},
[] (mutation_partition&& mp) { return std::move(mp); },
[this] (mutation_partition& a, const mutation_partition& b) {
mutation_application_stats app_stats;
a.apply(*_schema, b, *_schema, app_stats);
});
const partition_version* this_v = &*version();
mutation_partition mp(*this_v->get_schema());
for (auto it = this_v->last();; it = it->prev()) {
mutation_application_stats app_stats;
mp.apply(*this_v->get_schema(), it->partition().as_mutation_partition(*it->get_schema()), *it->get_schema(), app_stats);
if (it == this_v) {
break;
}
}
return mp;
}
tombstone partition_entry::partition_tombstone() const {
@@ -166,19 +179,43 @@ partition_snapshot::~partition_snapshot() {
}
void merge_versions(const schema& s, mutation_partition_v2& newer, mutation_partition_v2&& older, cache_tracker* tracker, is_evictable evictable) {
mutation_application_stats app_stats;
older.apply_monotonically(s, std::move(newer), tracker, app_stats, evictable);
older.apply(s, std::move(newer), tracker, evictable);
newer = std::move(older);
}
// Inserts a new version after pv.
// Used only when upgrading the schema of pv.
static partition_version& append_version(partition_version& pv, const schema& s, cache_tracker* tracker) {
// Every evictable version must have a dummy entry at the end so that
// it can be tracked in the LRU. It is also needed to allow old versions
// to stay around (with tombstones and static rows) after fully evicted.
// Such versions must be fully discontinuous, and thus have a dummy at the end.
auto new_version = tracker
? current_allocator().construct<partition_version>(mutation_partition_v2::make_incomplete(s), s.shared_from_this())
: current_allocator().construct<partition_version>(mutation_partition_v2(s), s.shared_from_this());
new_version->partition().set_static_row_continuous(pv.partition().static_row_continuous());
new_version->insert_after(pv);
if (tracker) {
tracker->insert(*new_version);
}
return *new_version;
}
stop_iteration partition_snapshot::merge_partition_versions(mutation_application_stats& app_stats) {
partition_version_ref& v = version();
if (!v.is_unique_owner()) {
// Shift _version to the oldest unreferenced version and then keep merging left hand side into it.
// This is good for performance because in case we were at the latest version
// we leave it for incoming writes and they don't have to create a new one.
//
// If `current->next()` has a different schema than `current`, it will have
// to be upgraded before being merged with `current`.
// If its upgrade is already in progress, it would be wasteful (though legal)
// to initiate its upgrade again, so we stop shifting.
//
// See the documentation in partition_version.hh for additional info about upgrades.
partition_version* current = &*v;
while (current->next() && !current->next()->is_referenced()) {
while (current->next() && !current->next()->is_referenced() && !current->next()->_is_being_upgraded) {
current = current->next();
_version = partition_version_ref(*current);
_version_merging_state.reset();
@@ -190,8 +227,32 @@ stop_iteration partition_snapshot::merge_partition_versions(mutation_application
if (!_version_merging_state) {
_version_merging_state = apply_resume();
}
const auto do_stop_iteration = current->partition().apply_monotonically(*schema(),
std::move(prev->partition()), _tracker, local_app_stats, is_preemptible::yes, *_version_merging_state,
if (!prev->_is_being_upgraded && prev->get_schema()->version() != current->get_schema()->version()) {
// The versions we are attempting to merge have different schemas.
// In this scenario the older version has to be upgraded before
// being merged with the newer one.
//
// This is done by adding a fresh empty version (with the newer
// schema) after `current` and merging `current` into the new
// version.
//
// While the upgrade is happening, `_is_being_upgraded` is set
// in the version which is being upgraded, to mark it as having
// older schema than its `next()` (and therefore violating the
// normal chronological schema order). This is necessary
// precisely for the above `if`, so that after resuming a
// preempted upgrade we can simply continue, instead of
// (illegally) initiating an upgrade of the special fresh
// version back to the old schema.
//
// See the documentation in partition_version.hh for additional info about upgrades.
current = &append_version(*current, *prev->get_schema(), _tracker);
_version = partition_version_ref(*current);
prev = current->prev();
prev->_is_being_upgraded = true;
}
const auto do_stop_iteration = current->partition().apply_monotonically(*current->get_schema(),
*prev->get_schema(), std::move(prev->partition()), _tracker, local_app_stats, default_preemption_check(), *_version_merging_state,
is_evictable(bool(_tracker)));
app_stats.row_hits += local_app_stats.row_hits;
if (do_stop_iteration == stop_iteration::no) {
@@ -200,6 +261,7 @@ stop_iteration partition_snapshot::merge_partition_versions(mutation_application
// If do_stop_iteration is yes, we have to remove the previous version.
// It now appears as fully continuous because it is empty.
_version_merging_state.reset();
prev->_is_being_upgraded = false;
if (prev->is_referenced()) {
_version.release();
prev->back_reference() = partition_version_ref(*current, prev->back_reference().is_unique_owner());
@@ -222,7 +284,7 @@ stop_iteration partition_snapshot::slide_to_oldest() noexcept {
_entry = nullptr;
}
partition_version* current = &*v;
while (current->next() && !current->next()->is_referenced()) {
while (current->next() && !current->next()->is_referenced() && !current->next()->_is_being_upgraded) {
current = current->next();
_version = partition_version_ref(*current);
}
@@ -239,18 +301,18 @@ unsigned partition_snapshot::version_count()
return count;
}
partition_entry::partition_entry(mutation_partition_v2 mp)
partition_entry::partition_entry(const schema& s, mutation_partition_v2 mp)
{
auto new_version = current_allocator().construct<partition_version>(std::move(mp));
auto new_version = current_allocator().construct<partition_version>(std::move(mp), s.shared_from_this());
_version = partition_version_ref(*new_version);
}
partition_entry::partition_entry(const schema& s, mutation_partition mp)
: partition_entry(mutation_partition_v2(s, std::move(mp)))
: partition_entry(s, mutation_partition_v2(s, std::move(mp)))
{ }
partition_entry::partition_entry(partition_entry::evictable_tag, const schema& s, mutation_partition&& mp)
: partition_entry([&] {
: partition_entry(s, [&] {
mp.ensure_last_dummy(s);
return mutation_partition_v2(s, std::move(mp));
}())
@@ -329,8 +391,8 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker*
// to stay around (with tombstones and static rows) after fully evicted.
// Such versions must be fully discontinuous, and thus have a dummy at the end.
auto new_version = tracker
? current_allocator().construct<partition_version>(mutation_partition_v2::make_incomplete(s))
: current_allocator().construct<partition_version>(mutation_partition_v2(s.shared_from_this()));
? current_allocator().construct<partition_version>(mutation_partition_v2::make_incomplete(s), s.shared_from_this())
: current_allocator().construct<partition_version>(mutation_partition_v2(s), s.shared_from_this());
new_version->partition().set_static_row_continuous(_version->partition().static_row_continuous());
new_version->insert_before(*_version);
set_version(new_version);
@@ -363,24 +425,24 @@ void partition_entry::apply(logalloc::region& r, mutation_cleaner& cleaner, cons
if (s.version() != mp_schema.version()) {
mp.upgrade(mp_schema, s);
}
auto new_version = current_allocator().construct<partition_version>(std::move(mp));
auto new_version = current_allocator().construct<partition_version>(std::move(mp), s.shared_from_this());
partition_snapshot_ptr snp; // Should die after new_version is inserted
if (!_snapshot) {
try {
apply_resume res;
auto notify = cleaner.make_region_space_guard();
if (_version->partition().apply_monotonically(s,
if (_version->partition().apply_monotonically(s, s,
std::move(new_version->partition()),
no_cache_tracker,
app_stats,
is_preemptible::yes,
default_preemption_check(),
res,
is_evictable::no) == stop_iteration::yes) {
current_allocator().destroy(new_version);
return;
} else {
// Apply was preempted. Let the cleaner finish the job when snapshot dies
snp = read(r, cleaner, s.shared_from_this(), no_cache_tracker);
snp = read(r, cleaner, no_cache_tracker);
// FIXME: Store res in the snapshot as an optimization to resume from where we left off.
}
} catch (...) {
@@ -418,14 +480,14 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
// So we cannot allow erasing when preemptible.
bool can_move = !preemptible && !pe._snapshot;
auto src_snp = pe.read(reg, pe_cleaner, s.shared_from_this(), no_cache_tracker);
auto src_snp = pe.read(reg, pe_cleaner, no_cache_tracker);
partition_snapshot_ptr prev_snp;
if (preemptible) {
// Reads must see prev_snp until whole update completes so that writes
// are not partially visible.
prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1);
prev_snp = read(reg, tracker.cleaner(), &tracker, phase - 1);
}
auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
auto dst_snp = read(reg, tracker.cleaner(), &tracker, phase);
dst_snp->lock();
// Once we start updating the partition, we must keep all snapshots until the update completes,
@@ -547,39 +609,41 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
});
}
mutation_partition_v2 partition_entry::squashed(schema_ptr from, schema_ptr to, is_evictable evictable)
mutation_partition_v2 partition_entry::squashed_v2(const schema& to, is_evictable evictable)
{
mutation_partition_v2 mp(to);
mp.set_static_row_continuous(_version->partition().static_row_continuous());
for (auto&& v : _version->all_elements()) {
auto older = mutation_partition_v2(*from, v.partition());
if (from->version() != to->version()) {
older.upgrade(*from, *to);
auto older = mutation_partition_v2(*v.get_schema(), v.partition());
if (v.get_schema()->version() != to.version()) {
older.upgrade(*v.get_schema(), to);
}
merge_versions(*to, mp, std::move(older), no_cache_tracker, evictable);
merge_versions(to, mp, std::move(older), no_cache_tracker, evictable);
}
return mp;
}
mutation_partition partition_entry::squashed(const schema& s, is_evictable evictable)
{
return squashed(s.shared_from_this(), s.shared_from_this(), evictable)
.as_mutation_partition(s);
return squashed_v2(s, evictable).as_mutation_partition(s);
}
void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner& cleaner, cache_tracker* tracker)
void partition_entry::upgrade(logalloc::region& r, schema_ptr to, mutation_cleaner& cleaner, cache_tracker* tracker)
{
auto new_version = current_allocator().construct<partition_version>(squashed(from, to, is_evictable(bool(tracker))));
auto old_version = &*_version;
set_version(new_version);
if (tracker) {
tracker->insert(*new_version);
}
remove_or_mark_as_unique_owner(old_version, &cleaner);
with_allocator(r.allocator(), [&] {
auto phase = partition_snapshot::max_phase;
if (_snapshot) {
phase = _snapshot->_phase;
}
// The destruction of this snapshot pointer will trigger a background merge
// of the old version into the new version.
partition_snapshot_ptr snp = read(r, cleaner, tracker, phase);
add_version(*to, tracker);
});
}
partition_snapshot_ptr partition_entry::read(logalloc::region& r,
mutation_cleaner& cleaner, schema_ptr entry_schema, cache_tracker* tracker, partition_snapshot::phase_type phase)
mutation_cleaner& cleaner, cache_tracker* tracker, partition_snapshot::phase_type phase)
{
if (_snapshot) {
if (_snapshot->_phase == phase) {
@@ -594,12 +658,12 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r,
return snp;
} else { // phase > _snapshot->_phase
with_allocator(r.allocator(), [&] {
add_version(*entry_schema, tracker);
add_version(*get_schema(), tracker);
});
}
}
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, cleaner, this, tracker, phase);
auto snp = make_lw_shared<partition_snapshot>(r, cleaner, this, tracker, phase);
_snapshot = snp.get();
return partition_snapshot_ptr(std::move(snp));
}
@@ -639,7 +703,7 @@ std::ostream& operator<<(std::ostream& out, const partition_entry::printer& p) {
}
out << ") ";
}
out << fmt::ptr(v) << ": " << mutation_partition_v2::printer(p._schema, v->partition());
out << fmt::ptr(v) << ": " << mutation_partition_v2::printer(*v->get_schema(), v->partition());
v = v->next();
first = false;
}

View File

@@ -80,21 +80,7 @@ class static_row;
// When the partition_snapshot is destroyed partition_versions are squashed
// together to minimize the amount of elements on the list.
//
// Scene IV. Schema upgrade
// pv pv --- pv
// ^ ^ ^
// | | |
// pe ps(u) ps
// When there is a schema upgrade the list of partition versions pointed to
// by partition_entry is replaced by a new single partition_version that is a
// result of squashing and upgrading the old versions.
// Old versions not used by any partition snapshot are removed. The first
// partition snapshot on the list is marked as unique which means that upon
// its destruction it won't attempt to squash versions but instead remove
// the unused ones and pass the "unique owner" mark the next snapshot on the
// list (if there is any).
//
// Scene V. partition_entry eviction
// Scene IV. partition_entry eviction
// pv
// ^
// |
@@ -104,11 +90,110 @@ class static_row;
// upgrade scenario. The unused ones are destroyed right away and the first
// snapshot on the list is marked as unique owner so that on its destruction
// it continues removal of the partition versions.
//
// Schema upgrades
//
// After a schema change (e.g. a column is removed), the layout of existing
// rows in memory becomes outdated and has to be adjusted before they are
// emitted by a query expecting the newer schema.
//
// Rows can be upgraded on the fly during queries. But upgrades have a high CPU
// cost, so we want them to happen only once. The upgraded row should be saved
// in memory so that future queries don't have to upgrade it again.
// And it should replace the old row as soon as possible (when there
// the row is no longer reachable through the old schema) to conserve memory.
//
// This behavior is akin to MVCC. A schema upgrade can be thought of as a
// special kind of update which affects all rows, and the MVCC machinery can be
// naturally hijacked to implement it.
//
// Currently, we do it as follows:
//
// - Each MVCC version has its own schema pointer. Versions in the same chain
// can be of different schemas.
//
// - The schema of a partition entry is defined as the schema of the newest version.
// A partition entry upgrade is performed simply by inserting a new empty version with
// the new schema. (And triggering a background version merge by creating and immediately
// destroying a snapshot pointing at the previous newest version).
// Due to this, schemas of versions in the chain are ordered chronologically.
// (The order is important because it's forbidden to upgrade to an older version,
// because that's lossy -- e.g. a new column can be lost).
//
// - On read, the cursor upgrades rows on the fly to the cursor's schema.
// If the cursor reads the latest version, the upgraded rows are written to the latest
// version.
//
// - When versions are merged, rows are upgraded to the newer schema, the result of the
// merge has the newer schema.
//
// This one is tricky. A natural idea is to merge older versions into the newer version,
// (upgrading rows when moving/copying them between versions), so that after a merge
// only the new version is left. But usually we want to merge in the other direction.
//
// (When an database write arrives, we want to merge it into the existing
// older version, so that it has a cost proportional to the size of the
// write, not to the size of the existing version, which can be arbitrarily
// large. Doing otherwise would invite quadratic behaviour)
//
// The merging algorithm is already very complicated and making it work in both
// directions (or adding a separate algorithm specifically for upgrades) would
// complicate things even further.
//
// So instead, when two versions of different schema are merged, the older version
// (which also has the older schema) is first upgraded to the newer schema in a special
// upgrade process which only uses regular newer-into-older merging.
// This is done by appending a fresh empty version with the newer schema after
// the version-to-be-upgraded, and merging the version-to-be-upgraded into the new one.
// In the end, only the new version with the newer schema is left.
//
// Technically the above procedure temporarily violates the rule that schema versions
// in the chain are ordered chronologically (which is needed for correctness).
// So while the above is happening, the version-to-be-upgraded has _is_being_upgraded set.
// A version with _is_being_upgraded is understood to be special in that its
// schema is older than its next neighbour's, and care is taken so that the
// neighbour isn't recursively downgraded back to the older schema.
// A version with _is_being_upgraded can be viewed together with its next() as
// conceptually a single version with the schema of next().
//
// The typical upgrade sequence, illustrated:
// 1. Initial state:
// pv1 (s1)
// ^
// |
// pe
// 2. partition_entry::upgrade(s2) is called. Empty pv2 is added.
// pv2 (s2) -- pv1 (s1)
// ^ ^
// | |
// pe ps1 (created and instantly dropped, so that merging is initiated)
// 3. Some time later, mutation_cleaner calls merge_partition_versions(ps1).
// Merge of pv2 and pv1 is attempted.
// Schemas differ, so instead an upgrade of pv1 is initiated. Empty pv1' is added.
// pv1 is now conceptually "owned" by pv1', and no snapshot is allowed to point to it
// after this point.
// pv2 (s2) -- pv1 (s1, _is_being_upgraded) -- pv1' (s2)
// ^ ^
// | |
// pe ps1
// 4. Eventually pv1 is fully upgrade-merged into pv1' and destroyed.
// pv2 (s2) -- pv1' (s2)
// ^ ^
// | |
// pe ps1
// 5. Upgrade over, further merge proceeds as usual. Eventually pv2 is fully merged into pv1'.
// pv1' (s2)
// ^
// |
// pe
class partition_version_ref;
class partition_version : public anchorless_list_base_hook<partition_version> {
partition_version_ref* _backref = nullptr;
schema_ptr _schema;
bool _is_being_upgraded = false;
mutation_partition_v2 _partition;
friend class partition_version_ref;
@@ -120,9 +205,18 @@ public:
}
explicit partition_version(schema_ptr s) noexcept
: _partition(std::move(s)) { }
explicit partition_version(mutation_partition_v2 mp) noexcept
: _partition(std::move(mp)) { }
: _schema(std::move(s))
, _partition(*_schema)
{
assert(_schema);
}
explicit partition_version(mutation_partition_v2 mp, schema_ptr s) noexcept
: _schema(std::move(s))
, _partition(std::move(mp))
{
assert(_schema);
}
partition_version(partition_version&& pv) noexcept;
partition_version& operator=(partition_version&& pv) noexcept;
~partition_version();
@@ -138,7 +232,9 @@ public:
bool is_referenced_from_entry() const;
partition_version_ref& back_reference() const { return *_backref; }
size_t size_in_allocator(const schema& s, allocation_strategy& allocator) const;
size_t size_in_allocator(allocation_strategy& allocator) const;
const schema_ptr& get_schema() const noexcept { return _schema; }
};
using partition_version_range = anchorless_list_base_hook<partition_version>::range;
@@ -256,7 +352,6 @@ public:
}
};
private:
schema_ptr _schema;
// Either _version or _entry is non-null.
partition_version_ref _version;
partition_entry* _entry;
@@ -270,13 +365,12 @@ private:
friend class partition_entry;
friend class mutation_cleaner_impl;
public:
explicit partition_snapshot(schema_ptr s,
logalloc::region& region,
explicit partition_snapshot(logalloc::region& region,
mutation_cleaner& cleaner,
partition_entry* entry,
cache_tracker* tracker, // non-null for evictable snapshots
phase_type phase = default_phase)
: _schema(std::move(s)), _entry(entry), _phase(phase), _region(&region), _cleaner(&cleaner), _tracker(tracker) { }
: _entry(entry), _phase(phase), _region(&region), _cleaner(&cleaner), _tracker(tracker) { }
partition_snapshot(const partition_snapshot&) = delete;
partition_snapshot(partition_snapshot&&) = delete;
partition_snapshot& operator=(const partition_snapshot&) = delete;
@@ -358,7 +452,7 @@ public:
return !version()->next();
}
const schema_ptr& schema() const { return _schema; }
const schema_ptr& schema() const { return version()->get_schema(); }
logalloc::region& region() const { return *_region; }
cache_tracker* tracker() const { return _tracker; }
mutation_cleaner& cleaner() { return *_cleaner; }
@@ -439,7 +533,7 @@ public:
// Constructs a non-evictable entry holding empty partition
partition_entry() = default;
// Constructs a non-evictable entry
explicit partition_entry(mutation_partition_v2);
partition_entry(const schema&, mutation_partition_v2);
partition_entry(const schema&, mutation_partition);
// Returns a reference to partition_entry containing given pv,
// assuming pv.is_referenced_from_entry().
@@ -523,24 +617,28 @@ public:
const schema& mp_schema,
mutation_application_stats& app_stats);
// Adds mutation_partition represented by "other" to the one represented
// Adds mutation_partition represented by "pe" to the one represented
// by this entry.
// This entry must be evictable.
// "pe" must be fully-continuous.
// (Alternatively: applies the "pe" memtable entry to "this" cache entry.)
//
// The argument must be fully-continuous.
//
// The continuity of this entry remains unchanged. Information from "other"
// The continuity of this entry remains unchanged. Information from "pe"
// which is incomplete in this instance is dropped. In other words, this
// performs set intersection on continuity information, drops information
// which falls outside of the continuity range, and applies regular merging
// rules for the rest.
// (Rationale: updates from the memtable are only applied to intervals
// which were already in cache. The cache treats the entire sstable set as a
// single source -- it isn't able to store partial information only from a
// single sstable.)
//
// Weak exception guarantees.
// If an exception is thrown this and pe will be left in some valid states
// If an exception is thrown, "this" and "pe" will be left in some valid states
// such that if the operation is retried (possibly many times) and eventually
// succeeds the result will be as if the first attempt didn't fail.
//
// The schema of pe must conform to s.
// The schema of "pe" must conform to "s".
//
// Returns a coroutine object representing the operation.
// The coroutine must be resumed with the region being unlocked.
@@ -581,27 +679,27 @@ public:
return *_version;
}
mutation_partition_v2 squashed(schema_ptr from, schema_ptr to, is_evictable);
mutation_partition_v2 squashed_v2(const schema& to, is_evictable);
mutation_partition squashed(const schema&, is_evictable);
tombstone partition_tombstone() const;
// needs to be called with reclaiming disabled
// Must not be called when is_locked().
void upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&, cache_tracker*);
void upgrade(logalloc::region& r, schema_ptr to, mutation_cleaner&, cache_tracker*);
const schema_ptr& get_schema() const noexcept { return _version->get_schema(); }
// Snapshots with different values of phase will point to different partition_version objects.
// When is_locked(), read() can only be called with a phase which is <= the phase of the current snapshot.
partition_snapshot_ptr read(logalloc::region& region,
mutation_cleaner&,
schema_ptr entry_schema,
cache_tracker*,
partition_snapshot::phase_type phase = partition_snapshot::default_phase);
class printer {
const schema& _schema;
const partition_entry& _partition_entry;
public:
printer(const schema& s, const partition_entry& pe) : _schema(s), _partition_entry(pe) { }
printer(const partition_entry& pe) : _partition_entry(pe) { }
printer(const printer&) = delete;
printer(printer&&) = delete;

View File

@@ -111,6 +111,7 @@ class partition_snapshot_row_cursor final {
mutation_partition::rows_type::iterator it;
utils::immutable_collection<mutation_partition::rows_type> rows;
int version_no;
const schema* schema;
bool unique_owner = false;
is_continuous continuous = is_continuous::no; // Range continuity in the direction of lower keys (in cursor schema domain).
@@ -202,7 +203,7 @@ class partition_snapshot_row_cursor final {
position_in_version& v = _heap.back();
rows_entry& e = *v.it;
if (_digest_requested) {
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
e.row().cells().prepare_hash(*v.schema, column_kind::regular_column);
}
_dummy &= bool(e.dummy());
_continuous |= bool(v.continuous);
@@ -280,7 +281,7 @@ class partition_snapshot_row_cursor final {
rt = pos->range_tombstone();
}
if (pos) [[likely]] {
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, cont, rt});
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, cont, rt});
}
} else {
if (_reversed) [[unlikely]] {
@@ -293,7 +294,7 @@ class partition_snapshot_row_cursor final {
_background_continuity = true; // Default continuity past the last entry
}
if (pos) [[likely]] {
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, is_continuous::yes});
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, is_continuous::yes});
}
}
++version_no;
@@ -469,7 +470,7 @@ public:
}
} else if (match) {
_current_row.insert(_current_row.begin(), position_in_version{
it, std::move(rows), 0, _unique_owner, cont, rt});
it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt});
if (heap_i != _heap.end()) {
_heap.erase(heap_i);
boost::range::make_heap(_heap, heap_less);
@@ -482,7 +483,7 @@ public:
boost::range::make_heap(_heap, heap_less);
} else {
_heap.push_back(position_in_version{
it, std::move(rows), 0, _unique_owner, cont, rt});
it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt});
boost::range::push_heap(_heap, heap_less);
}
}
@@ -556,9 +557,9 @@ public:
clustering_row row() const {
// Note: if the precondition ("cursor is valid and pointing at a row") is fulfilled
// then _current_row is not empty, so the below is valid.
clustering_row cr(key(), deletable_row(_schema, _current_row[0].it->row()));
clustering_row cr(key(), deletable_row(_schema, *_current_row[0].schema, _current_row[0].it->row()));
for (size_t i = 1; i < _current_row.size(); ++i) {
cr.apply(_schema, _current_row[i].it->row());
cr.apply(_schema, *_current_row[i].schema, _current_row[i].it->row());
}
return cr;
}
@@ -571,32 +572,23 @@ public:
// Can be called only when cursor is valid and pointing at a row.
// Monotonic exception guarantees.
template <typename Consumer>
requires std::is_invocable_v<Consumer, deletable_row>
requires std::is_invocable_v<Consumer, deletable_row&&>
void consume_row(Consumer&& consumer) {
for (position_in_version& v : _current_row) {
if (v.unique_owner) {
if (v.unique_owner && (_schema.version() == v.schema->version())) [[likely]] {
consumer(std::move(v.it->row()));
} else {
consumer(deletable_row(_schema, v.it->row()));
consumer(deletable_row(_schema, *v.schema, v.it->row()));
}
}
}
// Can be called only when cursor is valid and pointing at a row.
template <typename Consumer>
requires std::is_invocable_v<Consumer, const deletable_row&>
void consume_row(Consumer&& consumer) const {
for (const position_in_version& v : _current_row) {
consumer(v.it->row());
}
}
// Returns memory footprint of row entries under the cursor.
// Can be called only when cursor is valid and pointing at a row.
size_t memory_usage() const {
size_t result = 0;
for (const position_in_version& v : _current_row) {
result += v.it->memory_usage(_schema);
result += v.it->memory_usage(*v.schema);
}
return result;
}
@@ -631,7 +623,7 @@ public:
is_dummy(!_position.is_clustering_row()), is_continuous::no));
} else {
return alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(*_snp.schema(), *_current_row[0].it));
current_allocator().construct<rows_entry>(*_snp.schema(), *_current_row[0].schema, *_current_row[0].it));
}
}();
rows_entry& re = *e;
@@ -707,7 +699,7 @@ public:
}
}
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(_schema, pos,
current_allocator().construct<rows_entry>(*_snp.version()->get_schema(), pos,
is_dummy(!pos.is_clustering_row()),
is_continuous::no));
if (latest_i && latest_i->continuous()) {

View File

@@ -225,7 +225,7 @@ memtable::find_or_create_partition(const dht::decorated_key& key) {
if (i == partitions.end() || !hint.match) {
partitions_type::iterator entry = partitions.emplace_before(i,
key.token().raw(), hint,
_schema, dht::decorated_key(key), mutation_partition(_schema));
_schema, dht::decorated_key(key), mutation_partition(*_schema));
++nr_partitions;
++_table_stats.memtable_partition_insertions;
if (!hint.emplace_keeps_iterators()) {
@@ -696,7 +696,7 @@ public:
};
partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) {
return _pe.read(mtbl.region(), mtbl.cleaner(), _schema, no_cache_tracker);
return _pe.read(mtbl.region(), mtbl.cleaner(), no_cache_tracker);
}
flat_mutation_reader_v2_opt
@@ -793,7 +793,7 @@ memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_han
with_allocator(allocator(), [this, &m, &m_schema] {
_allocating_section(*this, [&, this] {
auto& p = find_or_create_partition_slow(m.key());
mutation_partition mp(m_schema);
mutation_partition mp(*m_schema);
partition_builder pb(*m_schema, mp);
m.partition().accept(*m_schema, pb);
_stats_collector.update(*m_schema, mp);
@@ -821,8 +821,7 @@ mutation_source memtable::as_data_source() {
}
memtable_entry::memtable_entry(memtable_entry&& o) noexcept
: _schema(std::move(o._schema))
, _key(std::move(o._key))
: _key(std::move(o._key))
, _pe(std::move(o._pe))
, _flags(o._flags)
{ }
@@ -839,19 +838,16 @@ bool memtable::is_flushed() const noexcept {
return bool(_underlying);
}
void memtable_entry::upgrade_schema(const schema_ptr& s, mutation_cleaner& cleaner) {
if (_schema != s) {
partition().upgrade(_schema, s, cleaner, no_cache_tracker);
_schema = s;
void memtable_entry::upgrade_schema(logalloc::region& r, const schema_ptr& s, mutation_cleaner& cleaner) {
if (schema() != s) {
partition().upgrade(r, s, cleaner, no_cache_tracker);
}
}
void memtable::upgrade_entry(memtable_entry& e) {
if (e._schema != _schema) {
if (e.schema() != _schema) {
assert(!reclaiming_enabled());
with_allocator(allocator(), [this, &e] {
e.upgrade_schema(_schema, cleaner());
});
e.upgrade_schema(region(), _schema, cleaner());
}
}
@@ -870,7 +866,7 @@ std::ostream& operator<<(std::ostream& out, memtable& mt) {
}
std::ostream& operator<<(std::ostream& out, const memtable_entry& mt) {
return out << "{" << mt.key() << ": " << partition_entry::printer(*mt.schema(), mt.partition()) << "}";
return out << "{" << mt.key() << ": " << partition_entry::printer(mt.partition()) << "}";
}
}

View File

@@ -33,7 +33,6 @@ namespace bi = boost::intrusive;
namespace replica {
class memtable_entry {
schema_ptr _schema;
dht::decorated_key _key;
partition_entry _pe;
struct {
@@ -52,9 +51,8 @@ public:
friend class memtable;
memtable_entry(schema_ptr s, dht::decorated_key key, mutation_partition p)
: _schema(std::move(s))
, _key(std::move(key))
, _pe(*_schema, std::move(p))
: _key(std::move(key))
, _pe(*s, std::move(p))
{ }
memtable_entry(memtable_entry&& o) noexcept;
@@ -65,13 +63,12 @@ public:
dht::decorated_key& key() { return _key; }
const partition_entry& partition() const { return _pe; }
partition_entry& partition() { return _pe; }
const schema_ptr& schema() const { return _schema; }
schema_ptr& schema() { return _schema; }
const schema_ptr& schema() const { return _pe.get_schema(); }
partition_snapshot_ptr snapshot(memtable& mtbl);
// Makes the entry conform to given schema.
// Must be called under allocating section of the region which owns the entry.
void upgrade_schema(const schema_ptr&, mutation_cleaner&);
void upgrade_schema(logalloc::region&, const schema_ptr&, mutation_cleaner&);
size_t external_memory_usage_without_rows() const {
return _key.key().external_memory_usage();
@@ -86,7 +83,7 @@ public:
size_t size_in_allocator(allocation_strategy& allocator) {
auto size = size_in_allocator_without_rows(allocator);
for (auto&& v : _pe.versions()) {
size += v.size_in_allocator(*_schema, allocator);
size += v.size_in_allocator(allocator);
}
return size;
}

View File

@@ -846,7 +846,7 @@ cache_entry& row_cache::find_or_create_incomplete(const partition_start& ps, row
cache_entry& row_cache::find_or_create_missing(const dht::decorated_key& key) {
return do_find_or_create_entry(key, nullptr, [&] (auto i, const partitions_type::bound_hint& hint) {
mutation_partition mp(_schema);
mutation_partition mp(*_schema);
bool cont = i->continuous();
partitions_type::iterator entry = _partitions.emplace_before(i, key.token().raw(), hint,
_schema, key, std::move(mp));
@@ -1024,9 +1024,9 @@ future<> row_cache::update(external_updater eu, replica::memtable& m) {
if (cache_i != partitions_end() && hint.match) {
cache_entry& entry = *cache_i;
upgrade_entry(entry);
assert(entry._schema == _schema);
assert(entry.schema() == _schema);
_tracker.on_partition_merge();
mem_e.upgrade_schema(_schema, _tracker.memtable_cleaner());
mem_e.upgrade_schema(_tracker.region(), _schema, _tracker.memtable_cleaner());
return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), _tracker.memtable_cleaner(),
alloc, _tracker.region(), _tracker, _underlying_phase, acc);
} else if (cache_i->continuous()
@@ -1035,10 +1035,10 @@ future<> row_cache::update(external_updater eu, replica::memtable& m) {
// Partition is absent in underlying. First, insert a neutral partition entry.
partitions_type::iterator entry = _partitions.emplace_before(cache_i, mem_e.key().token().raw(), hint,
cache_entry::evictable_tag(), _schema, dht::decorated_key(mem_e.key()),
partition_entry::make_evictable(*_schema, mutation_partition(_schema)));
partition_entry::make_evictable(*_schema, mutation_partition(*_schema)));
entry->set_continuous(cache_i->continuous());
_tracker.insert(*entry);
mem_e.upgrade_schema(_schema, _tracker.memtable_cleaner());
mem_e.upgrade_schema(_tracker.region(), _schema, _tracker.memtable_cleaner());
return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), _tracker.memtable_cleaner(),
alloc, _tracker.region(), _tracker, _underlying_phase, acc);
} else {
@@ -1194,8 +1194,7 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker,
}
cache_entry::cache_entry(cache_entry&& o) noexcept
: _schema(std::move(o._schema))
, _key(std::move(o._key))
: _key(std::move(o._key))
, _pe(std::move(o._pe))
, _flags(o._flags)
{
@@ -1296,9 +1295,9 @@ flat_mutation_reader_v2 cache_entry::read(row_cache& rc, std::unique_ptr<read_co
// Assumes reader is in the corresponding partition
flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, read_context& reader) {
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, reader.phase());
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, reader.native_slice(), _key.key());
schema_ptr entry_schema = to_query_domain(reader.slice(), _schema);
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), &rc._tracker, reader.phase());
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*schema(), reader.native_slice(), _key.key());
schema_ptr entry_schema = to_query_domain(reader.slice(), schema());
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, reader, std::move(snp));
r.upgrade_schema(to_query_domain(reader.slice(), rc.schema()));
r.upgrade_schema(reader.schema());
@@ -1306,10 +1305,10 @@ flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, read_context& reader
}
flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, std::unique_ptr<read_context> unique_ctx) {
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, unique_ctx->phase());
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, unique_ctx->native_slice(), _key.key());
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), &rc._tracker, unique_ctx->phase());
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*schema(), unique_ctx->native_slice(), _key.key());
schema_ptr reader_schema = unique_ctx->schema();
schema_ptr entry_schema = to_query_domain(unique_ctx->slice(), _schema);
schema_ptr entry_schema = to_query_domain(unique_ctx->slice(), schema());
auto rc_schema = to_query_domain(unique_ctx->slice(), rc.schema());
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, std::move(unique_ctx), std::move(snp));
r.upgrade_schema(rc_schema);
@@ -1322,13 +1321,10 @@ const schema_ptr& row_cache::schema() const {
}
void row_cache::upgrade_entry(cache_entry& e) {
if (e._schema != _schema && !e.partition().is_locked()) {
if (e.schema() != _schema && !e.partition().is_locked()) {
auto& r = _tracker.region();
assert(!r.reclaiming_enabled());
with_allocator(r.allocator(), [this, &e] {
e.partition().upgrade(e._schema, _schema, _tracker.cleaner(), &_tracker);
e._schema = _schema;
});
e.partition().upgrade(r, _schema, _tracker.cleaner(), &_tracker);
}
}
@@ -1371,6 +1367,6 @@ std::ostream& operator<<(std::ostream& out, const cache_entry& e) {
return out << "{cache_entry: " << e.position()
<< ", cont=" << e.continuous()
<< ", dummy=" << e.is_dummy_entry()
<< ", " << partition_entry::printer(*e.schema(), e.partition())
<< ", " << partition_entry::printer(e.partition())
<< "}";
}

View File

@@ -50,7 +50,6 @@ class lsa_manager;
//
// TODO: Make memtables use this format too.
class cache_entry {
schema_ptr _schema;
dht::decorated_key _key;
partition_entry _pe;
// True when we know that there is nothing between this entry and the previous one in cache
@@ -86,9 +85,8 @@ public:
}
cache_entry(schema_ptr s, const dht::decorated_key& key, const mutation_partition& p)
: _schema(std::move(s))
, _key(key)
, _pe(partition_entry::make_evictable(*_schema, mutation_partition(*_schema, p)))
: _key(key)
, _pe(partition_entry::make_evictable(*s, mutation_partition(*s, p)))
{ }
cache_entry(schema_ptr s, dht::decorated_key&& key, mutation_partition&& p)
@@ -99,8 +97,7 @@ public:
// It is assumed that pe is fully continuous
// pe must be evictable.
cache_entry(evictable_tag, schema_ptr s, dht::decorated_key&& key, partition_entry&& pe) noexcept
: _schema(std::move(s))
, _key(std::move(key))
: _key(std::move(key))
, _pe(std::move(pe))
{ }
@@ -130,8 +127,7 @@ public:
const partition_entry& partition() const noexcept { return _pe; }
partition_entry& partition() { return _pe; }
const schema_ptr& schema() const noexcept { return _schema; }
schema_ptr& schema() noexcept { return _schema; }
const schema_ptr& schema() const noexcept { return _pe.get_schema(); }
flat_mutation_reader_v2 read(row_cache&, cache::read_context&);
flat_mutation_reader_v2 read(row_cache&, std::unique_ptr<cache::read_context>);
flat_mutation_reader_v2 read(row_cache&, cache::read_context&, utils::phased_barrier::phase_type);

View File

@@ -4509,7 +4509,7 @@ public:
const mutation& m = z.get<1>().mut;
for (const version& v : z.get<0>()) {
auto diff = v.par
? m.partition().difference(schema, (co_await v.par->mut().unfreeze_gently(schema)).partition())
? m.partition().difference(*schema, (co_await v.par->mut().unfreeze_gently(schema)).partition())
: mutation_partition(*schema, m.partition());
std::optional<mutation> mdiff;
if (!diff.empty()) {

View File

@@ -190,7 +190,7 @@ public:
static partition_snapshot_ptr snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) {
return rc._read_section(rc._tracker.region(), [&] {
cache_entry& e = rc.lookup(dk);
return e.partition().read(rc._tracker.region(), rc._tracker.cleaner(), e.schema(), &rc._tracker);
return e.partition().read(rc._tracker.region(), rc._tracker.cleaner(), &rc._tracker);
});
}
};

View File

@@ -270,7 +270,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
// Difference
m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m2.partition()));
m = mutation(s, m1.decorated_key(), m1.partition().difference(*s, m2.partition()));
ac = get_counter_cell(m);
BOOST_REQUIRE(ac.is_live());
{
@@ -287,7 +287,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
verify_shard_order(ccv);
}
m = mutation(s, m1.decorated_key(), m2.partition().difference(s, m1.partition()));
m = mutation(s, m1.decorated_key(), m2.partition().difference(*s, m1.partition()));
ac = get_counter_cell(m);
BOOST_REQUIRE(ac.is_live());
{
@@ -304,11 +304,11 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
verify_shard_order(ccv);
}
m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m3.partition()));
m = mutation(s, m1.decorated_key(), m1.partition().difference(*s, m3.partition()));
BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0);
BOOST_REQUIRE(m.partition().static_row().empty());
m = mutation(s, m1.decorated_key(), m3.partition().difference(s, m1.partition()));
m = mutation(s, m1.decorated_key(), m3.partition().difference(*s, m1.partition()));
ac = get_counter_cell(m);
BOOST_REQUIRE(!ac.is_live());

View File

@@ -1103,7 +1103,7 @@ SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) {
m2 = mutation_partition_v2(s, second.partition());
});
auto check = defer([&] {
m.apply_monotonically(s, std::move(m2), no_cache_tracker, app_stats, is_evictable::no);
m.apply(s, std::move(m2));
assert_that(target.schema(), m).is_equal_to_compacted(expected.partition());
});
auto continuity_check = defer([&] {
@@ -1120,7 +1120,7 @@ SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) {
}
});
apply_resume res;
if (m.apply_monotonically(s, std::move(m2), no_cache_tracker, app_stats, preempt_check, res, is_evictable::yes) == stop_iteration::yes) {
if (m.apply_monotonically(s, s, std::move(m2), no_cache_tracker, app_stats, preempt_check, res, is_evictable::yes) == stop_iteration::yes) {
continuity_check.cancel();
seastar::memory::local_failure_injector().cancel();
}
@@ -1195,7 +1195,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
m12.apply(m1);
m12.apply(m2);
auto m2_1 = m2.partition().difference(s, m1.partition());
auto m2_1 = m2.partition().difference(*s, m1.partition());
BOOST_REQUIRE_EQUAL(m2_1.partition_tombstone(), tombstone());
BOOST_REQUIRE(!m2_1.static_row().size());
BOOST_REQUIRE(!m2_1.find_row(*s, ckey1));
@@ -1212,7 +1212,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
m12_1.partition().apply(*s, m2_1, *s, app_stats);
BOOST_REQUIRE_EQUAL(m12, m12_1);
auto m1_2 = m1.partition().difference(s, m2.partition());
auto m1_2 = m1.partition().difference(*s, m2.partition());
BOOST_REQUIRE_EQUAL(m1_2.partition_tombstone(), m12.partition().partition_tombstone());
BOOST_REQUIRE(m1_2.find_row(*s, ckey1));
BOOST_REQUIRE(m1_2.find_row(*s, ckey2));
@@ -1230,10 +1230,10 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
m12_2.partition().apply(*s, m1_2, *s, app_stats);
BOOST_REQUIRE_EQUAL(m12, m12_2);
auto m3_12 = m3.partition().difference(s, m12.partition());
auto m3_12 = m3.partition().difference(*s, m12.partition());
BOOST_REQUIRE(m3_12.empty());
auto m12_3 = m12.partition().difference(s, m3.partition());
auto m12_3 = m12.partition().difference(*s, m3.partition());
BOOST_REQUIRE_EQUAL(m12_3.partition_tombstone(), m12.partition().partition_tombstone());
mutation m123(s, partition_key::from_single_value(*s, "key1"));
@@ -1364,13 +1364,13 @@ SEASTAR_TEST_CASE(test_query_digest) {
auto m3 = m2;
{
auto diff = m1.partition().difference(s, m2.partition());
auto diff = m1.partition().difference(*s, m2.partition());
m3.partition().apply(*m3.schema(), std::move(diff), app_stats);
}
auto m4 = m1;
{
auto diff = m2.partition().difference(s, m1.partition());
auto diff = m2.partition().difference(*s, m1.partition());
m4.partition().apply(*m4.schema(), std::move(diff), app_stats);
}
@@ -1881,7 +1881,7 @@ SEASTAR_TEST_CASE(test_collection_cell_diff) {
mutation m12 = m1;
m12.apply(m2);
auto diff = m12.partition().difference(s, m1.partition());
auto diff = m12.partition().difference(*s, m1.partition());
BOOST_REQUIRE(!diff.empty());
BOOST_REQUIRE(m2.partition().equal(*s, diff));
});
@@ -1919,11 +1919,11 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
auto m12 = m1;
m12.apply(m2);
auto m12_with_diff = m1;
m12_with_diff.partition().apply(*s, m2.partition().difference(s, m1.partition()), app_stats);
m12_with_diff.partition().apply(*s, m2.partition().difference(*s, m1.partition()), app_stats);
check_partitions_match(m12.partition(), m12_with_diff.partition(), *s);
check_partitions_match(mutation_partition{s}, m1.partition().difference(s, m1.partition()), *s);
check_partitions_match(m1.partition(), m1.partition().difference(s, mutation_partition{s}), *s);
check_partitions_match(mutation_partition{s}, mutation_partition{s}.difference(s, m1.partition()), *s);
check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s);
check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s);
check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s);
});
});
}
@@ -2102,7 +2102,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_non_evictable_snapshot) {
auto to_apply = mutation_partition_v2(s, m2_v2);
apply_resume res;
while (result_v2.apply_monotonically(s, std::move(to_apply), no_cache_tracker, app_stats,
while (result_v2.apply_monotonically(s, s, std::move(to_apply), no_cache_tracker, app_stats,
[&] () noexcept { return preempt(); }, res, is_evictable::no) == stop_iteration::no) {
seastar::thread::maybe_yield();
}
@@ -2132,7 +2132,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_non_evictable_snapshot) {
static void clear(cache_tracker& tracker, const schema& s, mutation_partition_v2& p) {
while (p.clear_gently(&tracker) == stop_iteration::no) {}
p = mutation_partition_v2(s.shared_from_this());
p = mutation_partition_v2(s);
tracker.insert(p);
}
@@ -2200,7 +2200,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_evictable_snapshot) {
clear(tracker, s, result_v2);
});
apply_resume res;
while (result_v2.apply_monotonically(s, std::move(m2_v2), &tracker, app_stats, is_preemptible::yes, res,
while (result_v2.apply_monotonically(s, s, std::move(m2_v2), &tracker, app_stats, default_preemption_check(), res,
is_evictable::yes) == stop_iteration::no) {
seastar::thread::maybe_yield();
}
@@ -2285,7 +2285,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_past_last_entry_in_evictable) {
mutation_application_stats app_stats;
apply_resume resume;
m1_v2.apply_monotonically(s, std::move(m2_v2), nullptr, app_stats, is_preemptible::no, resume,
m1_v2.apply_monotonically(s, s, std::move(m2_v2), nullptr, app_stats, never_preempt(), resume,
is_evictable::yes);
BOOST_REQUIRE(m1_v2.is_fully_continuous());
@@ -2306,7 +2306,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_past_last_entry_in_evictable) {
// m2_v2: --------------- 5 ==(rt)== 7 [rt] ---
// m1_v2: === 1 === 3 =========================
m1_v2.apply_monotonically(s, std::move(m2_v2), nullptr, app_stats, is_preemptible::no, resume, is_evictable::yes);
m1_v2.apply_monotonically(s, s, std::move(m2_v2), nullptr, app_stats, never_preempt(), resume, is_evictable::yes);
BOOST_REQUIRE(m1_v2.is_fully_continuous());
assert_that(ss.schema(), m1_v2).is_equal_to_compacted(s, (m1 + m2).partition());

View File

@@ -41,7 +41,7 @@ static thread_local mutation_application_stats app_stats_for_tests;
// The cursor must be pointing at a row and valid.
// The cursor will not be pointing at a row after this.
static mutation_partition read_partition_from(const schema& schema, partition_snapshot_row_cursor& cur) {
mutation_partition p(schema.shared_from_this());
mutation_partition p(schema);
position_in_partition prev = position_in_partition::before_all_clustered_rows();
do {
testlog.trace("cur: {}", cur);
@@ -170,14 +170,14 @@ public:
void upgrade(schema_ptr new_schema) {
_container.allocate_in_region([&] {
_e.upgrade(_s, new_schema, _container.cleaner(), _container.tracker());
_e.upgrade(_container.region(), new_schema, _container.cleaner(), _container.tracker());
_s = new_schema;
});
}
partition_snapshot_ptr read() {
return _container.allocate_in_region([&] {
return _e.read(region(), _container.cleaner(), schema(), _container.tracker(), _container.phase());
return _e.read(region(), _container.cleaner(), _container.tracker(), _container.phase());
});
}
@@ -194,7 +194,7 @@ void mvcc_partition::apply_to_evictable(partition_entry&& src, schema_ptr src_sc
mutation_cleaner src_cleaner(region(), no_cache_tracker, app_stats_for_tests);
auto c = as(region(), [&] {
if (_s != src_schema) {
src.upgrade(src_schema, _s, src_cleaner, no_cache_tracker);
src.upgrade(region(), _s, src_cleaner, no_cache_tracker);
}
return _e.apply_to_incomplete(*schema(), std::move(src), src_cleaner, as, region(),
*_container.tracker(), _container.next_phase(), _container.accounter());
@@ -221,7 +221,7 @@ mvcc_partition& mvcc_partition::operator+=(const mutation& m) {
void mvcc_partition::apply(const mutation_partition& mp, schema_ptr mp_s) {
with_allocator(region().allocator(), [&] {
if (_evictable) {
apply_to_evictable(partition_entry(mutation_partition_v2(*mp_s, mp)), mp_s);
apply_to_evictable(partition_entry(*mp_s, mutation_partition_v2(*mp_s, mp)), mp_s);
} else {
logalloc::allocating_section as;
as(region(), [&] {
@@ -249,7 +249,7 @@ mvcc_partition mvcc_container::make_not_evictable(const mutation_partition& mp)
return with_allocator(region().allocator(), [&] {
logalloc::allocating_section as;
return as(region(), [&] {
return mvcc_partition(_schema, partition_entry(mutation_partition_v2(*_schema, mp)), *this, false);
return mvcc_partition(_schema, partition_entry(*_schema, mutation_partition_v2(*_schema, mp)), *this, false);
});
});
}
@@ -304,7 +304,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete) {
assert_that(table.schema(), e.squashed()).is_equal_to((m2 + m3).partition());
// Check that snapshot data is not stolen when its entry is applied
auto e2 = ms.make_evictable(mutation_partition(table.schema()));
auto e2 = ms.make_evictable(mutation_partition(s));
e2 += std::move(e);
assert_that(table.schema(), ms.squashed(snap1)).is_equal_to(m1.partition());
assert_that(table.schema(), e2.squashed()).is_equal_to((m2 + m3).partition());
@@ -381,7 +381,7 @@ SEASTAR_TEST_CASE(test_eviction_with_active_reader) {
auto ck1 = table.make_ckey(1);
auto ck2 = table.make_ckey(2);
auto e = ms.make_evictable(mutation_partition(table.schema()));
auto e = ms.make_evictable(mutation_partition(s));
mutation m1(table.schema(), pk);
m1.partition().clustered_row(s, ck2);
@@ -452,7 +452,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
}
auto expected = mutation_partition(*s, before);
expected.apply_weak(*s, std::move(expected_to_apply_slice), app_stats);
expected.apply(*s, std::move(expected_to_apply_slice), app_stats);
e += to_apply;
@@ -505,7 +505,7 @@ void evict_with_consistency_check(mvcc_container& ms, mvcc_partition& e, const m
testlog.trace("evicting");
auto ret = ms.tracker()->evict_from_lru_shallow();
testlog.trace("entry: {}", partition_entry::printer(s, e.entry()));
testlog.trace("entry: {}", partition_entry::printer(e.entry()));
auto p = e.squashed();
auto cont = p.get_continuity(s);
@@ -553,7 +553,7 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
auto snap2 = e.read();
e += m3;
testlog.trace("e: {}", partition_entry::printer(*e.schema(), e.entry()));
testlog.trace("e: {}", partition_entry::printer(e.entry()));
auto expected = e.squashed();
auto snap = e.read();
@@ -592,14 +592,14 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging_for_nonevictab
{
mutation_application_stats app_stats;
logalloc::reclaim_lock rl(r);
auto e = partition_entry(mutation_partition_v2(*s, m3.partition()));
auto snap1 = e.read(r, cleaner, s, no_cache_tracker);
auto e = partition_entry(*s, mutation_partition_v2(*s, m3.partition()));
auto snap1 = e.read(r, cleaner, no_cache_tracker);
e.apply(r, cleaner, *s, m2.partition(), *s, app_stats);
auto snap2 = e.read(r, cleaner, s, no_cache_tracker);
auto snap2 = e.read(r, cleaner, no_cache_tracker);
e.apply(r, cleaner, *s, m1.partition(), *s, app_stats);
auto expected = e.squashed(*s, is_evictable::no);
auto snap = e.read(r, cleaner, s, no_cache_tracker);
auto snap = e.read(r, cleaner, no_cache_tracker);
auto actual = read_using_cursor(*snap);
BOOST_REQUIRE(expected.is_fully_continuous());
@@ -636,7 +636,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_in_evictable) {
{
logalloc::reclaim_lock rl(r);
auto e = partition_entry::make_evictable(*s, m1.partition());
auto snap1 = e.read(r, tracker.cleaner(), s, &tracker);
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
e.add_version(*s, &tracker).partition()
.clustered_row(*s, ss.make_ckey(1), is_dummy::no, is_continuous::no);
e.add_version(*s, &tracker).partition()
@@ -646,7 +646,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_in_evictable) {
expected.clustered_row(*s, ss.make_ckey(1), is_dummy::no, is_continuous::no);
expected.clustered_row(*s, ss.make_ckey(2), is_dummy::no, is_continuous::no);
auto snap = e.read(r, tracker.cleaner(), s, &tracker);
auto snap = e.read(r, tracker.cleaner(), &tracker);
auto actual = read_using_cursor(*snap);
auto actual2 = e.squashed(*s, is_evictable::yes);
@@ -670,8 +670,8 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) {
simple_schema table;
auto&& s = *table.schema();
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
auto e = partition_entry::make_evictable(s, mutation_partition(s));
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
{
auto&& p1 = snap1->version()->partition();
@@ -683,7 +683,7 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) {
p1.ensure_last_dummy(s);
}
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
partition_snapshot_row_cursor cur(s, *snap2);
position_in_partition::equal_compare eq(s);
@@ -841,8 +841,8 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor_reversed) {
simple_schema table;
auto&& s = *table.schema();
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
auto e = partition_entry::make_evictable(s, mutation_partition(s));
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
int ck_0 = 10;
int ck_1 = 9;
@@ -862,7 +862,7 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor_reversed) {
p1.ensure_last_dummy(s);
}
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
auto rev_s = s.make_reversed();
partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true);
@@ -1032,9 +1032,9 @@ SEASTAR_TEST_CASE(test_cursor_tracks_continuity_in_reversed_mode) {
simple_schema table;
auto&& s = *table.schema();
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
auto e = partition_entry::make_evictable(s, mutation_partition(s));
tracker.insert(e);
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
{
auto&& p1 = snap1->version()->partition();
@@ -1044,7 +1044,7 @@ SEASTAR_TEST_CASE(test_cursor_tracks_continuity_in_reversed_mode) {
p1.clustered_rows_entry(s, table.make_ckey(4), is_dummy::no, is_continuous::no));
}
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
{
auto&& p2 = snap2->version()->partition();
@@ -1173,7 +1173,7 @@ public:
, _tracker(t)
, _r(r)
, _e(_tracker ? partition_entry::make_evictable(*_schema, mutation_partition::make_incomplete(*_schema))
: partition_entry(mutation_partition_v2(_schema)))
: partition_entry(*_schema, mutation_partition_v2(*_schema)))
{
if (_tracker) {
_tracker->insert(_e);
@@ -1181,7 +1181,7 @@ public:
}
partition_entry_builder& new_version() {
_snapshots.emplace_back(_e.read(_r, _cleaner, _schema, _tracker, _snapshots.size()));
_snapshots.emplace_back(_e.read(_r, _cleaner, _tracker, _snapshots.size()));
_last_key = {};
return *this;
}
@@ -1537,8 +1537,8 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_in_reversed_mode) {
simple_schema table;
auto&& s = *table.schema();
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
auto e = partition_entry::make_evictable(s, mutation_partition(s));
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
{
auto&& p1 = snap1->version()->partition();
@@ -1547,7 +1547,7 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_in_reversed_mode) {
p1.ensure_last_dummy(s);
}
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
{
auto&& p2 = snap2->version()->partition();
@@ -1593,8 +1593,8 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_does_not_set_continuity_in_reverse
simple_schema table;
auto&& s = *table.schema();
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
auto e = partition_entry::make_evictable(s, mutation_partition(s));
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
{
auto&& p1 = snap1->version()->partition();
@@ -1604,7 +1604,7 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_does_not_set_continuity_in_reverse
p1.ensure_last_dummy(s);
}
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
{
auto&& p2 = snap2->version()->partition();
@@ -1654,7 +1654,7 @@ SEASTAR_TEST_CASE(test_apply_is_atomic) {
while (true) {
logalloc::reclaim_lock rl(r);
mutation_partition_v2 m2 = mutation_partition_v2(*second.schema(), second.partition());
auto e = partition_entry(mutation_partition_v2(*target.schema(), target.partition()));
auto e = partition_entry(*target.schema(), mutation_partition_v2(*target.schema(), target.partition()));
//auto snap1 = e.read(r, gen.schema());
alloc.fail_after(fail_offset++);
@@ -1703,8 +1703,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
m3.partition().make_fully_continuous();
{
auto e = partition_entry(mutation_partition_v2(*s, m1.partition()));
auto snap1 = e.read(r, cleaner, s, nullptr);
auto e = partition_entry(*s, mutation_partition_v2(*s, m1.partition()));
auto snap1 = e.read(r, cleaner, nullptr);
{
mutation_application_stats app_stats;
@@ -1712,7 +1712,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
e.apply(r, cleaner, *s, m2.partition(), *s, app_stats);
}
auto snap2 = e.read(r, cleaner, s, nullptr);
auto snap2 = e.read(r, cleaner, nullptr);
snap1 = {};
snap2 = {};
@@ -1724,8 +1724,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
}
{
auto e = partition_entry(mutation_partition_v2(*s, m1.partition()));
auto snap1 = e.read(r, cleaner, s, nullptr);
auto e = partition_entry(*s, mutation_partition_v2(*s, m1.partition()));
auto snap1 = e.read(r, cleaner, nullptr);
{
mutation_application_stats app_stats;
@@ -1733,7 +1733,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
e.apply(r, cleaner, *s, m2.partition(), *s, app_stats);
}
auto snap2 = e.read(r, cleaner, s, nullptr);
auto snap2 = e.read(r, cleaner, nullptr);
snap2 = {};
snap1 = {};
@@ -2005,7 +2005,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) {
}
});
testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(*s, e.entry()));
testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(e.entry()));
mutation m2(s, ss.make_pkey());
// This one covers the dummy row for before(3) and before(2), marking the range [1, 3] as continuous.
@@ -2020,7 +2020,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) {
e += m3;
auto snp3 = e.read();
testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(*s, e.entry()));
testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(e.entry()));
auto expected = m0 + m1 + m2 + m3;
@@ -2033,3 +2033,104 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) {
evict_with_consistency_check(ms, e, expected.partition());
});
}
SEASTAR_TEST_CASE(test_gentle_schema_upgrades) {
return seastar::async([] {
auto ts1 = api::new_timestamp();
auto ts_drop = api::new_timestamp();
auto ts2 = api::new_timestamp();
auto s1 = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck", utf8_type, column_kind::clustering_key)
.with_column("s1", utf8_type, column_kind::static_column)
.with_column("s2", utf8_type, column_kind::static_column)
.with_column("v1", utf8_type, column_kind::regular_column)
.with_column("v2", utf8_type, column_kind::regular_column)
.with_column("v3", utf8_type, column_kind::regular_column)
.with_column("v4", utf8_type, column_kind::regular_column)
.build();
auto s2 = schema_builder(s1)
.remove_column("s1")
.remove_column("v3")
.without_column("v4", ts_drop).with_column("v4", utf8_type)
.with_column("v5", utf8_type)
.build();
auto m1 = std::invoke([s1, ts1] {
auto x = mutation(s1, partition_key::from_single_value(*s1, serialized(0)));
auto ck = clustering_key::from_single_value(*s1, serialized(0));
x.set_static_cell("s1", "s1_value", ts1);
x.set_static_cell("s2", "s2_value", ts1);
x.set_clustered_cell(ck, "v1", "v1_value", ts1);
x.set_clustered_cell(ck, "v2", "v2_value", ts1);
x.set_clustered_cell(ck, "v3", "v3_value", ts1);
x.set_clustered_cell(ck, "v4", "v4_value", ts1);
x.partition().set_static_row_continuous(false);
x.partition().ensure_last_dummy(*s1);
return x;
});
auto m2 = std::invoke([s2, ts2] {
auto x = mutation(s2, partition_key::from_single_value(*s2, serialized(0)));
auto ck = clustering_key::from_single_value(*s2, serialized(0));
x.set_clustered_cell(ck, "v2", "v2_value_new", ts2);
x.set_clustered_cell(ck, "v5", "v5_value_new", ts2);
x.partition().set_static_row_continuous(false);
x.partition().ensure_last_dummy(*s2);
return x;
});
auto expected = std::invoke([s2, ts1, ts2] {
auto x = mutation(s2, partition_key::from_single_value(*s2, serialized(0)));
auto ck = clustering_key::from_single_value(*s2, serialized(0));
x.set_static_cell("s2", "s2_value", ts1);
x.set_clustered_cell(ck, "v1", "v1_value", ts1);
x.set_clustered_cell(ck, "v2", "v2_value_new", ts2);
x.set_clustered_cell(ck, "v5", "v5_value_new", ts2);
x.partition().set_static_row_continuous(false);
x.partition().ensure_last_dummy(*s2);
return x;
});
{
// Test that the version merge is lazy.
// (This is not important and might be changed in the future.
// We often run some operations synchronously and only put them
// in the background after they preempt for the first time.)
mvcc_container ms(s1);
auto e = ms.make_evictable(m1.partition());
e.upgrade(s2);
BOOST_REQUIRE(e.entry().version()->next());
// Test that the upgrade initiated the merge.
ms.cleaner().drain().get();
BOOST_REQUIRE(!e.entry().version()->next());
}
{
// Test that the on-the-fly merge gives the expected result.
mvcc_container ms(s1);
auto e = ms.make_evictable(m1.partition());
auto rd1 = e.read();
e.upgrade(s2);
auto rd2 = e.read();
e += m2;
auto rd3 = e.read();
assert_that(s1, read_using_cursor(*rd1)).is_equal_to(*s1, m1.partition());
auto rd2_expected = mutation_partition(*s1, m1.partition());
rd2_expected.upgrade(*s1, *s2);
assert_that(s2, read_using_cursor(*rd2)).is_equal_to(rd2_expected);
assert_that(s2, read_using_cursor(*rd3)).is_equal_to(*s2, expected.partition());
rd1 = {};
rd2 = {};
// Merge versions.
ms.cleaner().drain().get();
BOOST_REQUIRE(!e.entry().version()->next());
// Test that the background merge gives the expected result.
assert_that(s2, read_using_cursor(*rd3)).is_equal_to(*s2, expected.partition());
}
});
}

View File

@@ -950,6 +950,7 @@ SEASTAR_TEST_CASE(test_eviction_after_schema_change) {
rd.fill_buffer().get();
}
tracker.cleaner().drain().get0();
while (tracker.region().evict_some() == memory::reclaiming_result::reclaimed_something) ;
// The partition should be evictable after schema change
@@ -3508,6 +3509,16 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
auto m2 = gen();
m2.partition().make_fully_continuous();
bool upgrade_schema = tests::random::get_bool();
if (upgrade_schema) {
schema_ptr new_schema = schema_builder(s)
.with_column(to_bytes("_phantom"), byte_type)
.remove_column("_phantom")
.build();
m2.upgrade(new_schema);
cache.set_schema(new_schema);
}
auto mt = make_lw_shared<replica::memtable>(m2.schema());
mt->apply(m2);
cache.update(row_cache::external_updater([&] () noexcept {