Merge ' mvcc: make schema upgrades gentle' from Michał Chojnowski
After a schema change, memtable and cache have to be upgraded to the new schema. Currently, they are upgraded (on the first access after a schema change) atomically, i.e. all rows of the entry are upgraded with one non-preemptible call. This is a one of the last vestiges of the times when partition were treated atomically, and it is a well known source of numerous large stalls. This series makes schema upgrades gentle (preemptible). This is done by co-opting the existing MVCC machinery. Before the series, all partition_versions in the partition_entry chain have the same schema, and an entry upgrade replaces the entire chain with a single squashed and upgraded version. After the series, each partition_version has its own schema. A partition entry upgrade happens simply by adding an empty version with the new schema to the head of the chain. Row entries are upgraded to the current schema on-the-fly by the cursor during reads, and by the MVCC version merge ongoing in the background after the upgrade. The series: 1. Does some code cleanup in the mutation_partition area. 2. Adds a schema field to partition_version and removes it from its containers (partition_snapshot, cache_entry, memtable_entry). 3. Adds upgrading variants of constructors and apply() for `row` and its wrappers. 4. Prepares partition_snapshot_row_cursor, mutation_partition_v2::apply_monotonically and partition_snapshot::merge_partition_versions for dealing with heterogeneous version chains. 5. Modifies partition_entry::upgrade to perform upgrades by extending the version chain with a new schema instead of squashing it to a single upgraded version. Fixes #2577 Closes #13761 * github.com:scylladb/scylladb: test: mvcc_test: add a test for gentle schema upgrades partition_version: make partition_entry::upgrade() gentle partition_version: handle multi-schema snapshots in merge_partition_versions mutation_partition_v2: handle schema upgrades in apply_monotonically() partition_version: remove the unused "from" argument in partition_entry::upgrade() row_cache_test: prepare test_eviction_after_schema_change for gentle schema upgrades partition_version: handle multi-schema entries in partition_entry::squashed partition_snapshot_row_cursor: handle multi-schema snapshots partiton_version: prepare partition_snapshot::squashed() for multi-schema snapshots partition_version: prepare partition_snapshot::static_row() for multi-schema snapshots partition_version: add a logalloc::region argument to partition_entry::upgrade() memtable: propagate the region to memtable_entry::upgrade_schema() mutation_partition: add an upgrading variant of lazy_row::apply() mutation_partition: add an upgrading variant of rows_entry::rows_entry mutation_partition: switch an apply() call to apply_monotonically() mutation_partition: add an upgrading variant of rows_entry::apply_monotonically() mutation_fragment: add an upgrading variant of clustering_row::apply() mutation_partition: add an upgrading variant of row::row partition_version: remove _schema from partition_entry::operator<< partition_version: remove the schema argument from partition_entry::read() memtable: remove _schema from memtable_entry row_cache: remove _schema from cache_entry partition_version: remove the _schema field from partition_snapshot partition_version: add a _schema field to partition_version mutation_partition: change schema_ptr to schema& in mutation_partition::difference mutation_partition: change schema_ptr to schema& in mutation_partition constructor mutation_partition_v2: change schema_ptr to schema& in mutation_partition_v2 constructor mutation_partition: add upgrading variants of row::apply() partition_version: update the comment to apply_to_incomplete() mutation_partition_v2: clean up variants of apply() mutation_partition: remove apply_weak() mutation_partition_v2: remove a misleading comment in apply_monotonically() row_cache_test: add schema changes to test_concurrent_reads_and_eviction mutation_partition: fix mixed-schema apply()
This commit is contained in:
@@ -487,7 +487,7 @@ mutation_partition& view_updates::partition_for(partition_key&& key) {
|
||||
if (it != _updates.end()) {
|
||||
return it->second;
|
||||
}
|
||||
return _updates.emplace(std::move(key), mutation_partition(_view)).first->second;
|
||||
return _updates.emplace(std::move(key), mutation_partition(*_view)).first->second;
|
||||
}
|
||||
|
||||
size_t view_updates::op_count() const {
|
||||
|
||||
@@ -13,13 +13,13 @@
|
||||
mutation::data::data(dht::decorated_key&& key, schema_ptr&& schema)
|
||||
: _schema(std::move(schema))
|
||||
, _dk(std::move(key))
|
||||
, _p(_schema)
|
||||
, _p(*_schema)
|
||||
{ }
|
||||
|
||||
mutation::data::data(partition_key&& key_, schema_ptr&& schema)
|
||||
: _schema(std::move(schema))
|
||||
, _dk(dht::decorate_key(*_schema, std::move(key_)))
|
||||
, _p(_schema)
|
||||
, _p(*_schema)
|
||||
{ }
|
||||
|
||||
mutation::data::data(schema_ptr&& schema, dht::decorated_key&& key, const mutation_partition& mp)
|
||||
|
||||
@@ -90,6 +90,9 @@ public:
|
||||
void apply(const schema& s, const deletable_row& r) {
|
||||
_row.apply(s, r);
|
||||
}
|
||||
void apply(const schema& our_schema, const schema& their_schema, const deletable_row& r) {
|
||||
_row.apply(our_schema, their_schema, r);
|
||||
}
|
||||
|
||||
position_in_partition_view position() const;
|
||||
|
||||
|
||||
@@ -243,21 +243,6 @@ void mutation_partition::ensure_last_dummy(const schema& s) {
|
||||
}
|
||||
}
|
||||
|
||||
void mutation_partition::apply(const schema& s, const mutation_partition& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats) {
|
||||
apply_weak(s, p, p_schema, app_stats);
|
||||
}
|
||||
|
||||
void mutation_partition::apply(const schema& s, mutation_partition&& p,
|
||||
mutation_application_stats& app_stats) {
|
||||
apply_weak(s, std::move(p), app_stats);
|
||||
}
|
||||
|
||||
void mutation_partition::apply(const schema& s, mutation_partition_view p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats) {
|
||||
apply_weak(s, p, p_schema, app_stats);
|
||||
}
|
||||
|
||||
struct mutation_fragment_applier {
|
||||
const schema& _s;
|
||||
mutation_partition& _mp;
|
||||
@@ -489,7 +474,7 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
|
||||
if (s.version() == p_schema.version()) {
|
||||
return apply_monotonically(s, std::move(p), no_cache_tracker, app_stats, preemptible, res);
|
||||
} else {
|
||||
mutation_partition p2(s, p);
|
||||
mutation_partition p2(p_schema, p);
|
||||
p2.upgrade(p_schema, s);
|
||||
return apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::no, res); // FIXME: make preemptible
|
||||
}
|
||||
@@ -508,7 +493,7 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
|
||||
}
|
||||
|
||||
void
|
||||
mutation_partition::apply_weak(const schema& s, mutation_partition_view p,
|
||||
mutation_partition::apply(const schema& s, mutation_partition_view p,
|
||||
const schema& p_schema, mutation_application_stats& app_stats) {
|
||||
// FIXME: Optimize
|
||||
mutation_partition p2(*this, copy_comparators_only{});
|
||||
@@ -517,13 +502,13 @@ mutation_partition::apply_weak(const schema& s, mutation_partition_view p,
|
||||
apply_monotonically(s, std::move(p2), p_schema, app_stats);
|
||||
}
|
||||
|
||||
void mutation_partition::apply_weak(const schema& s, const mutation_partition& p,
|
||||
void mutation_partition::apply(const schema& s, const mutation_partition& p,
|
||||
const schema& p_schema, mutation_application_stats& app_stats) {
|
||||
// FIXME: Optimize
|
||||
apply_monotonically(s, mutation_partition(s, p), p_schema, app_stats);
|
||||
apply_monotonically(s, mutation_partition(p_schema, p), p_schema, app_stats);
|
||||
}
|
||||
|
||||
void mutation_partition::apply_weak(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
|
||||
void mutation_partition::apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
|
||||
apply_monotonically(s, std::move(p), no_cache_tracker, app_stats);
|
||||
}
|
||||
|
||||
@@ -1161,22 +1146,42 @@ deletable_row::equal(column_kind kind, const schema& s, const deletable_row& oth
|
||||
return _cells.equal(kind, s, other._cells, other_schema);
|
||||
}
|
||||
|
||||
void deletable_row::apply(const schema& s, const deletable_row& src) {
|
||||
apply_monotonically(s, src);
|
||||
}
|
||||
|
||||
void deletable_row::apply(const schema& s, deletable_row&& src) {
|
||||
apply_monotonically(s, std::move(src));
|
||||
}
|
||||
|
||||
void deletable_row::apply_monotonically(const schema& s, const deletable_row& src) {
|
||||
_cells.apply(s, column_kind::regular_column, src._cells);
|
||||
void deletable_row::apply(const schema& s, const deletable_row& src) {
|
||||
apply_monotonically(s, src);
|
||||
}
|
||||
|
||||
void deletable_row::apply(const schema& our_schema, const schema& their_schema, deletable_row&& src) {
|
||||
apply_monotonically(our_schema, their_schema, std::move(src));
|
||||
}
|
||||
|
||||
void deletable_row::apply(const schema& our_schema, const schema& their_schema, const deletable_row& src) {
|
||||
apply_monotonically(our_schema, their_schema, src);
|
||||
}
|
||||
|
||||
void deletable_row::apply_monotonically(const schema& s, deletable_row&& src) {
|
||||
_cells.apply_monotonically(s, column_kind::regular_column, std::move(src._cells));
|
||||
_marker.apply(src._marker);
|
||||
_deleted_at.apply(src._deleted_at, _marker);
|
||||
}
|
||||
|
||||
void deletable_row::apply_monotonically(const schema& s, deletable_row&& src) {
|
||||
_cells.apply(s, column_kind::regular_column, std::move(src._cells));
|
||||
void deletable_row::apply_monotonically(const schema& s, const deletable_row& src) {
|
||||
_cells.apply_monotonically(s, column_kind::regular_column, src._cells);
|
||||
_marker.apply(src._marker);
|
||||
_deleted_at.apply(src._deleted_at, _marker);
|
||||
}
|
||||
|
||||
void deletable_row::apply_monotonically(const schema& our_schema, const schema& their_schema, deletable_row&& src) {
|
||||
_cells.apply_monotonically(our_schema, their_schema, column_kind::regular_column, std::move(src._cells));
|
||||
_marker.apply(src._marker);
|
||||
_deleted_at.apply(src._deleted_at, _marker);
|
||||
}
|
||||
|
||||
void deletable_row::apply_monotonically(const schema& our_schema, const schema& their_schema, const deletable_row& src) {
|
||||
_cells.apply_monotonically(our_schema, their_schema, column_kind::regular_column, src._cells);
|
||||
_marker.apply(src._marker);
|
||||
_deleted_at.apply(src._deleted_at, _marker);
|
||||
}
|
||||
@@ -1576,6 +1581,16 @@ row::row(const schema& s, column_kind kind, const row& o) : _size(o._size)
|
||||
_cells.clone_from(o._cells, clone_cell_and_hash);
|
||||
}
|
||||
|
||||
row row::construct(const schema& our_schema, const schema& their_schema, column_kind kind, const row& o) {
|
||||
if (our_schema.version() == their_schema.version()) {
|
||||
return row(our_schema, kind, o);
|
||||
} else {
|
||||
row r;
|
||||
r.apply(our_schema, their_schema, kind, o);
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
row::~row() {
|
||||
}
|
||||
|
||||
@@ -1640,7 +1655,32 @@ row& row::operator=(row&& other) noexcept {
|
||||
return *this;
|
||||
}
|
||||
|
||||
void row::apply(const schema& s, column_kind kind, row&& other) {
|
||||
apply_monotonically(s, kind, std::move(other));
|
||||
}
|
||||
|
||||
void row::apply(const schema& s, column_kind kind, const row& other) {
|
||||
apply_monotonically(s, kind, other);
|
||||
}
|
||||
|
||||
void row::apply(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other) {
|
||||
apply_monotonically(our_schema, their_schema, kind, std::move(other));
|
||||
};
|
||||
|
||||
void row::apply(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other) {
|
||||
apply_monotonically(our_schema, their_schema, kind, other);
|
||||
};
|
||||
|
||||
void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
|
||||
if (other.empty()) {
|
||||
return;
|
||||
}
|
||||
other.consume_with([&] (column_id id, cell_and_hash& c_a_h) {
|
||||
apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash));
|
||||
});
|
||||
}
|
||||
|
||||
void row::apply_monotonically(const schema& s, column_kind kind, const row& other) {
|
||||
if (other.empty()) {
|
||||
return;
|
||||
}
|
||||
@@ -1649,16 +1689,29 @@ void row::apply(const schema& s, column_kind kind, const row& other) {
|
||||
});
|
||||
}
|
||||
|
||||
void row::apply(const schema& s, column_kind kind, row&& other) {
|
||||
apply_monotonically(s, kind, std::move(other));
|
||||
}
|
||||
|
||||
void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
|
||||
if (other.empty()) {
|
||||
return;
|
||||
void row::apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other) {
|
||||
if (our_schema.version() == their_schema.version()) {
|
||||
return apply_monotonically(our_schema, kind, std::move(other));
|
||||
}
|
||||
other.consume_with([&] (column_id id, cell_and_hash& c_a_h) {
|
||||
apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash));
|
||||
const column_definition& their_col = their_schema.column_at(kind, id);
|
||||
const column_definition* our_col = our_schema.get_column_definition(their_col.name());
|
||||
if (our_col) {
|
||||
converting_mutation_partition_applier::append_cell(*this, kind, *our_col, their_col, c_a_h.cell);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void row::apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other) {
|
||||
if (our_schema.version() == their_schema.version()) {
|
||||
return apply_monotonically(our_schema, kind, other);
|
||||
}
|
||||
other.for_each_cell([&] (column_id id, const cell_and_hash& c_a_h) {
|
||||
const column_definition& their_col = their_schema.column_at(kind, id);
|
||||
const column_definition* our_col = our_schema.get_column_definition(their_col.name());
|
||||
if (our_col) {
|
||||
converting_mutation_partition_applier::append_cell(*this, kind, *our_col, their_col, c_a_h.cell);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1873,19 +1926,19 @@ bool row_marker::compact_and_expire(tombstone tomb, gc_clock::time_point now,
|
||||
return !is_missing() && _ttl != dead;
|
||||
}
|
||||
|
||||
mutation_partition mutation_partition::difference(schema_ptr s, const mutation_partition& other) const
|
||||
mutation_partition mutation_partition::difference(const schema& s, const mutation_partition& other) const
|
||||
{
|
||||
check_schema(*s);
|
||||
check_schema(s);
|
||||
mutation_partition mp(s);
|
||||
if (_tombstone > other._tombstone) {
|
||||
mp.apply(_tombstone);
|
||||
}
|
||||
mp._static_row = _static_row.difference(*s, column_kind::static_column, other._static_row);
|
||||
mp._static_row = _static_row.difference(s, column_kind::static_column, other._static_row);
|
||||
|
||||
mp._row_tombstones = _row_tombstones.difference(*s, other._row_tombstones);
|
||||
mp._row_tombstones = _row_tombstones.difference(s, other._row_tombstones);
|
||||
|
||||
auto it_r = other._rows.begin();
|
||||
rows_entry::compare cmp_r(*s);
|
||||
rows_entry::compare cmp_r(s);
|
||||
for (auto&& r : _rows) {
|
||||
if (r.dummy()) {
|
||||
continue;
|
||||
@@ -1893,12 +1946,12 @@ mutation_partition mutation_partition::difference(schema_ptr s, const mutation_p
|
||||
while (it_r != other._rows.end() && (it_r->dummy() || cmp_r(*it_r, r))) {
|
||||
++it_r;
|
||||
}
|
||||
if (it_r == other._rows.end() || !it_r->key().equal(*s, r.key())) {
|
||||
mp.insert_row(*s, r.key(), r.row());
|
||||
if (it_r == other._rows.end() || !it_r->key().equal(s, r.key())) {
|
||||
mp.insert_row(s, r.key(), r.row());
|
||||
} else {
|
||||
auto dr = r.row().difference(*s, column_kind::regular_column, it_r->row());
|
||||
auto dr = r.row().difference(s, column_kind::regular_column, it_r->row());
|
||||
if (!dr.empty()) {
|
||||
mp.insert_row(*s, r.key(), std::move(dr));
|
||||
mp.insert_row(s, r.key(), std::move(dr));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1936,7 +1989,7 @@ void mutation_partition::accept(const schema& s, mutation_partition_visitor& v)
|
||||
void
|
||||
mutation_partition::upgrade(const schema& old_schema, const schema& new_schema) {
|
||||
// We need to copy to provide strong exception guarantees.
|
||||
mutation_partition tmp(new_schema.shared_from_this());
|
||||
mutation_partition tmp(new_schema);
|
||||
tmp.set_static_row_continuous(_static_row_continuous);
|
||||
converting_mutation_partition_applier v(old_schema.get_column_mapping(), new_schema, tmp);
|
||||
accept(old_schema, v);
|
||||
|
||||
@@ -97,6 +97,7 @@ public:
|
||||
row();
|
||||
~row();
|
||||
row(const schema&, column_kind, const row&);
|
||||
static row construct(const schema& our_schema, const schema& their_schema, column_kind, const row&);
|
||||
row(row&& other) noexcept;
|
||||
row& operator=(row&& other) noexcept;
|
||||
size_t size() const { return _size; }
|
||||
@@ -178,10 +179,15 @@ public:
|
||||
|
||||
// Weak exception guarantees
|
||||
void apply(const schema&, column_kind, const row& src);
|
||||
// Weak exception guarantees
|
||||
void apply(const schema&, column_kind, row&& src);
|
||||
void apply(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other);
|
||||
void apply(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other);
|
||||
|
||||
// Monotonic exception guarantees
|
||||
void apply_monotonically(const schema&, column_kind, row&& src);
|
||||
void apply_monotonically(const schema&, column_kind, const row& src);
|
||||
void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind, row&& src);
|
||||
void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind, const row& src);
|
||||
|
||||
// Expires cells based on query_time. Expires tombstones based on gc_before
|
||||
// and max_purgeable. Removes cells covered by tomb.
|
||||
@@ -416,6 +422,14 @@ public:
|
||||
get_existing().apply_monotonically(s, kind, std::move(src.get_existing()));
|
||||
}
|
||||
|
||||
// Monotonic exception guarantees
|
||||
void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, lazy_row&& src) {
|
||||
if (src.empty()) {
|
||||
return;
|
||||
}
|
||||
maybe_create().apply_monotonically(our_schema, their_schema, kind, std::move(src.get_existing()));
|
||||
}
|
||||
|
||||
// Expires cells based on query_time. Expires tombstones based on gc_before
|
||||
// and max_purgeable. Removes cells covered by tomb.
|
||||
// Returns true iff there are any live cells left.
|
||||
@@ -820,6 +834,11 @@ public:
|
||||
, _marker(other._marker)
|
||||
, _cells(s, column_kind::regular_column, other._cells)
|
||||
{ }
|
||||
deletable_row(const schema& our_schema, const schema& their_schema, const deletable_row& other)
|
||||
: _deleted_at(other._deleted_at)
|
||||
, _marker(other._marker)
|
||||
, _cells(row::construct(our_schema, their_schema, column_kind::regular_column, other._cells))
|
||||
{ }
|
||||
deletable_row(row_tombstone&& tomb, row_marker&& marker, row&& cells)
|
||||
: _deleted_at(std::move(tomb)), _marker(std::move(marker)), _cells(std::move(cells))
|
||||
{}
|
||||
@@ -852,10 +871,15 @@ public:
|
||||
|
||||
// Weak exception guarantees. After exception, both src and this will commute to the same value as
|
||||
// they would should the exception not happen.
|
||||
void apply(const schema& s, const deletable_row& src);
|
||||
void apply(const schema& s, deletable_row&& src);
|
||||
void apply_monotonically(const schema& s, const deletable_row& src);
|
||||
void apply_monotonically(const schema& s, deletable_row&& src);
|
||||
void apply(const schema&, deletable_row&& src);
|
||||
void apply(const schema&, const deletable_row& src);
|
||||
void apply(const schema& our_schema, const schema& their_schema, const deletable_row& src);
|
||||
void apply(const schema& our_schema, const schema& their_schema, deletable_row&& src);
|
||||
|
||||
void apply_monotonically(const schema&, deletable_row&& src);
|
||||
void apply_monotonically(const schema&, const deletable_row& src);
|
||||
void apply_monotonically(const schema& our_schema, const schema& their_schema, deletable_row&& src);
|
||||
void apply_monotonically(const schema& our_schema, const schema& their_schema, const deletable_row& src);
|
||||
public:
|
||||
row_tombstone deleted_at() const { return _deleted_at; }
|
||||
api::timestamp_type created_at() const { return _marker.timestamp(); }
|
||||
@@ -956,6 +980,12 @@ public:
|
||||
, _range_tombstone(e._range_tombstone)
|
||||
, _flags(e._flags)
|
||||
{ }
|
||||
rows_entry(const schema& our_schema, const schema& their_schema, const rows_entry& e)
|
||||
: _key(e._key)
|
||||
, _row(our_schema, their_schema, e._row)
|
||||
, _range_tombstone(e._range_tombstone)
|
||||
, _flags(e._flags)
|
||||
{ }
|
||||
// Valid only if !dummy()
|
||||
clustering_key& key() {
|
||||
return _key;
|
||||
@@ -989,7 +1019,10 @@ public:
|
||||
_row.apply(t);
|
||||
}
|
||||
void apply_monotonically(const schema& s, rows_entry&& e) {
|
||||
_row.apply(s, std::move(e._row));
|
||||
_row.apply_monotonically(s, std::move(e._row));
|
||||
}
|
||||
void apply_monotonically(const schema& our_schema, const schema& their_schema, rows_entry&& e) {
|
||||
_row.apply_monotonically(our_schema, their_schema, std::move(e._row));
|
||||
}
|
||||
bool empty() const {
|
||||
return _row.empty();
|
||||
@@ -1193,11 +1226,11 @@ public:
|
||||
static mutation_partition make_incomplete(const schema& s, tombstone t = {}) {
|
||||
return mutation_partition(incomplete_tag(), s, t);
|
||||
}
|
||||
mutation_partition(schema_ptr s)
|
||||
mutation_partition(const schema& s)
|
||||
: _rows()
|
||||
, _row_tombstones(*s)
|
||||
, _row_tombstones(s)
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(s->version())
|
||||
, _schema_version(s.version())
|
||||
#endif
|
||||
{ }
|
||||
mutation_partition(mutation_partition& other, copy_comparators_only)
|
||||
@@ -1279,14 +1312,14 @@ public:
|
||||
// is not representable in this_schema is dropped, thus apply() loses commutativity.
|
||||
//
|
||||
// Weak exception guarantees.
|
||||
// Assumes this and p are not owned by a cache_tracker.
|
||||
void apply(const schema& this_schema, const mutation_partition& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
// Use in case this instance and p share the same schema.
|
||||
// Same guarantees as apply(const schema&, mutation_partition&&, const schema&);
|
||||
void apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats);
|
||||
// Same guarantees and constraints as for apply(const schema&, const mutation_partition&, const schema&).
|
||||
void apply(const schema& this_schema, mutation_partition_view p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
// Use in case this instance and p share the same schema.
|
||||
// Same guarantees and constraints as for other variants of apply().
|
||||
void apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats);
|
||||
|
||||
// Applies p to this instance.
|
||||
//
|
||||
@@ -1321,15 +1354,6 @@ public:
|
||||
stop_iteration apply_monotonically(const schema& s, mutation_partition&& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
|
||||
// Weak exception guarantees.
|
||||
// Assumes this and p are not owned by a cache_tracker.
|
||||
void apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
void apply_weak(const schema& s, mutation_partition&&,
|
||||
mutation_application_stats& app_stats);
|
||||
void apply_weak(const schema& s, mutation_partition_view p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
|
||||
// Converts partition to the new schema. When succeeds the partition should only be accessed
|
||||
// using the new schema.
|
||||
//
|
||||
@@ -1397,7 +1421,7 @@ public:
|
||||
// Returns the minimal mutation_partition that when applied to "other" will
|
||||
// create a mutation_partition equal to the sum of other and this one.
|
||||
// This and other must both be governed by the same schema s.
|
||||
mutation_partition difference(schema_ptr s, const mutation_partition& other) const;
|
||||
mutation_partition difference(const schema& s, const mutation_partition& other) const;
|
||||
|
||||
// Returns a subset of this mutation holding only information relevant for given clustering ranges.
|
||||
// Range tombstones will be trimmed to the boundaries of the clustering ranges.
|
||||
|
||||
@@ -66,7 +66,7 @@ mutation_partition_v2::mutation_partition_v2(const schema& s, mutation_partition
|
||||
auto&& tombstones = x.mutable_row_tombstones();
|
||||
if (!tombstones.empty()) {
|
||||
try {
|
||||
mutation_partition_v2 p(s.shared_from_this());
|
||||
mutation_partition_v2 p(s);
|
||||
|
||||
for (auto&& t: tombstones) {
|
||||
range_tombstone & rt = t.tombstone();
|
||||
@@ -75,8 +75,7 @@ mutation_partition_v2::mutation_partition_v2(const schema& s, mutation_partition
|
||||
.set_range_tombstone(rt.tomb);
|
||||
}
|
||||
|
||||
mutation_application_stats app_stats;
|
||||
apply_monotonically(s, std::move(p), s, app_stats);
|
||||
apply(s, std::move(p));
|
||||
} catch (...) {
|
||||
_rows.clear_and_dispose(current_deleter<rows_entry>());
|
||||
throw;
|
||||
@@ -118,20 +117,28 @@ struct fmt::formatter<apply_resume> : fmt::formatter<std::string_view> {
|
||||
}
|
||||
};
|
||||
|
||||
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker,
|
||||
mutation_application_stats& app_stats, is_preemptible preemptible, apply_resume& res, is_evictable evictable) {
|
||||
return apply_monotonically(s, std::move(p), tracker, app_stats,
|
||||
preemptible ? default_preemption_check() : never_preempt(), res, evictable);
|
||||
void mutation_partition_v2::apply(const schema& s, mutation_partition&& p) {
|
||||
apply(s, mutation_partition_v2(s, std::move(p)));
|
||||
}
|
||||
void mutation_partition_v2::apply(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker, is_evictable evictable) {
|
||||
mutation_application_stats app_stats;
|
||||
apply_resume res;
|
||||
apply_monotonically(s, s, std::move(p), tracker, app_stats, never_preempt(), res, evictable);
|
||||
}
|
||||
|
||||
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker,
|
||||
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, const schema& p_s, mutation_partition_v2&& p, cache_tracker* tracker,
|
||||
mutation_application_stats& app_stats, preemption_check need_preempt, apply_resume& res, is_evictable evictable) {
|
||||
#ifdef SEASTAR_DEBUG
|
||||
assert(s.version() == _schema_version);
|
||||
assert(p._schema_version == _schema_version);
|
||||
assert(_schema_version == s.version());
|
||||
assert(p._schema_version == p_s.version());
|
||||
#endif
|
||||
bool same_schema = s.version() == p_s.version();
|
||||
_tombstone.apply(p._tombstone);
|
||||
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
|
||||
if (same_schema) [[likely]] {
|
||||
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
|
||||
} else {
|
||||
_static_row.apply_monotonically(s, p_s, column_kind::static_column, std::move(p._static_row));
|
||||
}
|
||||
_static_row_continuous |= p._static_row_continuous;
|
||||
|
||||
rows_entry::tri_compare cmp(s);
|
||||
@@ -215,7 +222,6 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
|
||||
alloc_strategy_unique_ptr<rows_entry> p_sentinel;
|
||||
alloc_strategy_unique_ptr<rows_entry> this_sentinel;
|
||||
auto insert_sentinel_back = defer([&] {
|
||||
// Insert this_sentinel before sentinel so that the former lands before the latter in LRU.
|
||||
if (this_sentinel) {
|
||||
assert(p_i != p._rows.end());
|
||||
auto rt = this_sentinel->range_tombstone();
|
||||
@@ -307,8 +313,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
|
||||
|
||||
if (need_preempt()) {
|
||||
auto s1 = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s,
|
||||
position_in_partition::after_key(s, lb_i->position()), is_dummy::yes, is_continuous::no));
|
||||
current_allocator().construct<rows_entry>(p_s,
|
||||
position_in_partition::after_key(p_s, lb_i->position()), is_dummy::yes, is_continuous::no));
|
||||
alloc_strategy_unique_ptr<rows_entry> s2;
|
||||
if (lb_i->position().is_clustering_row()) {
|
||||
s2 = alloc_strategy_unique_ptr<rows_entry>(
|
||||
@@ -347,8 +353,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
|
||||
if (next_interval_loaded) {
|
||||
// FIXME: Avoid reallocation
|
||||
s1 = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s,
|
||||
position_in_partition::after_key(s, src_e.position()), is_dummy::yes, is_continuous::no));
|
||||
current_allocator().construct<rows_entry>(p_s,
|
||||
position_in_partition::after_key(p_s, src_e.position()), is_dummy::yes, is_continuous::no));
|
||||
if (src_e.position().is_clustering_row()) {
|
||||
s2 = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s,
|
||||
@@ -361,31 +367,41 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
|
||||
}
|
||||
}
|
||||
|
||||
rows_type::key_grabber pi_kg(p_i);
|
||||
lb_i = _rows.insert_before(i, std::move(pi_kg));
|
||||
if (same_schema) [[likely]] {
|
||||
rows_type::key_grabber pi_kg(p_i);
|
||||
lb_i = _rows.insert_before(i, std::move(pi_kg));
|
||||
} else {
|
||||
// FIXME: avoid cell reallocation.
|
||||
// We are copying the row to make exception safety simpler,
|
||||
// but it's not inherently necessary and could be avoided.
|
||||
auto new_e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(s, p_s, src_e));
|
||||
lb_i = _rows.insert_before(i, std::move(new_e));
|
||||
lb_i->swap(src_e);
|
||||
p_i = p._rows.erase_and_dispose(p_i, del);
|
||||
}
|
||||
p_sentinel = std::move(s1);
|
||||
this_sentinel = std::move(s2);
|
||||
|
||||
// Check if src_e falls into a continuous range.
|
||||
// Check if src_e (now: lb_i) fell into a continuous range.
|
||||
// The range past the last entry is also always implicitly continuous.
|
||||
if (i == _rows.end() || i->continuous()) {
|
||||
tombstone i_rt = i != _rows.end() ? i->range_tombstone() : tombstone();
|
||||
// Cannot apply only-row range tombstone falling into a continuous range without inserting extra entry.
|
||||
// Should not occur in practice due to the "older versions are evicted first" rule.
|
||||
// Never occurs in non-evictable snapshots because they are continuous.
|
||||
if (!src_e.continuous() && src_e.range_tombstone() > i_rt) {
|
||||
if (src_e.dummy()) {
|
||||
if (!lb_i->continuous() && lb_i->range_tombstone() > i_rt) {
|
||||
if (lb_i->dummy()) {
|
||||
lb_i->set_range_tombstone(i_rt);
|
||||
} else {
|
||||
position_in_partition_view i_pos = i != _rows.end() ? i->position()
|
||||
: position_in_partition_view::after_all_clustered_rows();
|
||||
// See the "no singular tombstones" rule.
|
||||
mplog.error("Cannot merge entry {} with rt={}, cont=0 into continuous range before {} with rt={}",
|
||||
src_e.position(), src_e.range_tombstone(), i_pos, i_rt);
|
||||
lb_i->position(), lb_i->range_tombstone(), i_pos, i_rt);
|
||||
abort();
|
||||
}
|
||||
} else {
|
||||
lb_i->set_range_tombstone(src_e.range_tombstone() + i_rt);
|
||||
lb_i->set_range_tombstone(lb_i->range_tombstone() + i_rt);
|
||||
}
|
||||
lb_i->set_continuous(true);
|
||||
}
|
||||
@@ -397,8 +413,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
|
||||
if (next_interval_loaded) {
|
||||
// FIXME: Avoid reallocation
|
||||
s1 = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s,
|
||||
position_in_partition::after_key(s, src_e.position()), is_dummy::yes, is_continuous::no));
|
||||
current_allocator().construct<rows_entry>(p_s,
|
||||
position_in_partition::after_key(p_s, src_e.position()), is_dummy::yes, is_continuous::no));
|
||||
if (src_e.position().is_clustering_row()) {
|
||||
s2 = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, s1->position(), is_dummy::yes, is_continuous::yes));
|
||||
@@ -436,18 +452,26 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
|
||||
}
|
||||
}
|
||||
if (tracker) {
|
||||
// Newer evictable versions store complete rows
|
||||
i->row() = std::move(src_e.row());
|
||||
// Need to preserve the LRU link of the later version in case it's
|
||||
// the last dummy entry which holds the partition entry linked in LRU.
|
||||
i->swap(src_e);
|
||||
if (same_schema) [[likely]] {
|
||||
// Newer evictable versions store complete rows
|
||||
i->row() = std::move(src_e.row());
|
||||
// Need to preserve the LRU link of the later version in case it's
|
||||
// the last dummy entry which holds the partition entry linked in LRU.
|
||||
i->swap(src_e);
|
||||
} else {
|
||||
i->apply_monotonically(s, p_s, std::move(src_e));
|
||||
}
|
||||
tracker->remove(src_e);
|
||||
} else {
|
||||
// Avoid row compaction if no newer range tombstone.
|
||||
do_compact = (src_e.range_tombstone() + src_e.row().deleted_at().regular()) >
|
||||
(i->range_tombstone() + i->row().deleted_at().regular());
|
||||
memory::on_alloc_point();
|
||||
i->apply_monotonically(s, std::move(src_e));
|
||||
if (same_schema) [[likely]] {
|
||||
i->apply_monotonically(s, std::move(src_e));
|
||||
} else {
|
||||
i->apply_monotonically(s, p_s, std::move(src_e));
|
||||
}
|
||||
}
|
||||
++app_stats.row_hits;
|
||||
p_i = p._rows.erase_and_dispose(p_i, del);
|
||||
@@ -491,59 +515,6 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats, is_preemptible preemptible, apply_resume& res, is_evictable evictable) {
|
||||
if (s.version() == p_schema.version()) {
|
||||
return apply_monotonically(s, std::move(p), no_cache_tracker, app_stats,
|
||||
preemptible ? default_preemption_check() : never_preempt(), res, evictable);
|
||||
} else {
|
||||
mutation_partition_v2 p2(s, p);
|
||||
p2.upgrade(p_schema, s);
|
||||
return apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, never_preempt(), res, evictable); // FIXME: make preemptible
|
||||
}
|
||||
}
|
||||
|
||||
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker *tracker,
|
||||
mutation_application_stats& app_stats, is_evictable evictable) {
|
||||
apply_resume res;
|
||||
return apply_monotonically(s, std::move(p), tracker, app_stats, is_preemptible::no, res, evictable);
|
||||
}
|
||||
|
||||
stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats) {
|
||||
apply_resume res;
|
||||
return apply_monotonically(s, std::move(p), p_schema, app_stats, is_preemptible::no, res, is_evictable::no);
|
||||
}
|
||||
|
||||
void mutation_partition_v2::apply(const schema& s, const mutation_partition_v2& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats) {
|
||||
apply_monotonically(s, mutation_partition_v2(p_schema, std::move(p)), p_schema, app_stats);
|
||||
}
|
||||
|
||||
void mutation_partition_v2::apply(const schema& s, mutation_partition_v2&& p, mutation_application_stats& app_stats) {
|
||||
apply_monotonically(s, mutation_partition_v2(s, std::move(p)), no_cache_tracker, app_stats, is_evictable::no);
|
||||
}
|
||||
|
||||
void
|
||||
mutation_partition_v2::apply_weak(const schema& s, mutation_partition_view p,
|
||||
const schema& p_schema, mutation_application_stats& app_stats) {
|
||||
// FIXME: Optimize
|
||||
mutation_partition p2(p_schema.shared_from_this());
|
||||
partition_builder b(p_schema, p2);
|
||||
p.accept(p_schema, b);
|
||||
apply_monotonically(s, mutation_partition_v2(p_schema, std::move(p2)), p_schema, app_stats);
|
||||
}
|
||||
|
||||
void mutation_partition_v2::apply_weak(const schema& s, const mutation_partition& p,
|
||||
const schema& p_schema, mutation_application_stats& app_stats) {
|
||||
// FIXME: Optimize
|
||||
apply_monotonically(s, mutation_partition_v2(s, p), p_schema, app_stats);
|
||||
}
|
||||
|
||||
void mutation_partition_v2::apply_weak(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
|
||||
apply_monotonically(s, mutation_partition_v2(s, std::move(p)), no_cache_tracker, app_stats, is_evictable::no);
|
||||
}
|
||||
|
||||
void
|
||||
mutation_partition_v2::apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t) {
|
||||
check_schema(schema);
|
||||
@@ -555,10 +526,9 @@ mutation_partition_v2::apply_row_tombstone(const schema& schema, clustering_key_
|
||||
void
|
||||
mutation_partition_v2::apply_row_tombstone(const schema& schema, range_tombstone rt) {
|
||||
check_schema(schema);
|
||||
mutation_partition mp(schema.shared_from_this());
|
||||
mutation_partition mp(schema);
|
||||
mp.apply_row_tombstone(schema, std::move(rt));
|
||||
mutation_application_stats stats;
|
||||
apply_weak(schema, std::move(mp), stats);
|
||||
apply(schema, std::move(mp));
|
||||
}
|
||||
|
||||
void
|
||||
@@ -965,7 +935,7 @@ void mutation_partition_v2::accept(const schema& s, mutation_partition_visitor&
|
||||
void
|
||||
mutation_partition_v2::upgrade(const schema& old_schema, const schema& new_schema) {
|
||||
// We need to copy to provide strong exception guarantees.
|
||||
mutation_partition tmp(new_schema.shared_from_this());
|
||||
mutation_partition tmp(new_schema);
|
||||
tmp.set_static_row_continuous(_static_row_continuous);
|
||||
converting_mutation_partition_applier v(old_schema.get_column_mapping(), new_schema, tmp);
|
||||
accept(old_schema, v);
|
||||
@@ -973,7 +943,7 @@ mutation_partition_v2::upgrade(const schema& old_schema, const schema& new_schem
|
||||
}
|
||||
|
||||
mutation_partition mutation_partition_v2::as_mutation_partition(const schema& s) const {
|
||||
mutation_partition tmp(s.shared_from_this());
|
||||
mutation_partition tmp(s);
|
||||
tmp.set_static_row_continuous(_static_row_continuous);
|
||||
partition_builder v(s, tmp);
|
||||
accept(s, v);
|
||||
|
||||
@@ -89,10 +89,10 @@ public:
|
||||
static mutation_partition_v2 make_incomplete(const schema& s, tombstone t = {}) {
|
||||
return mutation_partition_v2(incomplete_tag(), s, t);
|
||||
}
|
||||
mutation_partition_v2(schema_ptr s)
|
||||
mutation_partition_v2(const schema& s)
|
||||
: _rows()
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(s->version())
|
||||
, _schema_version(s.version())
|
||||
#endif
|
||||
{ }
|
||||
mutation_partition_v2(mutation_partition_v2& other, copy_comparators_only)
|
||||
@@ -167,18 +167,11 @@ public:
|
||||
// prefix must not be full
|
||||
void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t);
|
||||
void apply_row_tombstone(const schema& schema, range_tombstone rt);
|
||||
//
|
||||
// Applies p to current object.
|
||||
//
|
||||
// Commutative when this_schema == p_schema. If schemas differ, data in p which
|
||||
// is not representable in this_schema is dropped, thus apply() loses commutativity.
|
||||
//
|
||||
// Weak exception guarantees.
|
||||
void apply(const schema& this_schema, const mutation_partition_v2& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
// Use in case this instance and p share the same schema.
|
||||
// Same guarantees as apply(const schema&, mutation_partition_v2&&, const schema&);
|
||||
void apply(const schema& s, mutation_partition_v2&& p, mutation_application_stats& app_stats);
|
||||
// Assumes this and p are not owned by a cache_tracker and non-evictable.
|
||||
void apply(const schema& s, mutation_partition&&);
|
||||
void apply(const schema& s, mutation_partition_v2&& p, cache_tracker* = nullptr, is_evictable evictable = is_evictable::no);
|
||||
|
||||
// Applies p to this instance.
|
||||
//
|
||||
@@ -204,25 +197,8 @@ public:
|
||||
//
|
||||
// If is_preemptible::yes is passed, apply_resume must also be passed,
|
||||
// same instance each time until stop_iteration::yes is returned.
|
||||
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker*,
|
||||
mutation_application_stats& app_stats, is_preemptible, apply_resume&, is_evictable);
|
||||
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats, is_preemptible, apply_resume&, is_evictable);
|
||||
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker,
|
||||
mutation_application_stats& app_stats, is_evictable);
|
||||
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker*,
|
||||
mutation_application_stats& app_stats, preemption_check, apply_resume&, is_evictable);
|
||||
|
||||
// Weak exception guarantees.
|
||||
// Assumes this and p are not owned by a cache_tracker and non-evictable.
|
||||
void apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
void apply_weak(const schema& s, mutation_partition&&,
|
||||
mutation_application_stats& app_stats);
|
||||
void apply_weak(const schema& s, mutation_partition_view p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
stop_iteration apply_monotonically(const schema& this_schema, const schema& p_schema, mutation_partition_v2&& p,
|
||||
cache_tracker*, mutation_application_stats& app_stats, preemption_check, apply_resume&, is_evictable);
|
||||
|
||||
// Converts partition to the new schema. When succeeds the partition should only be accessed
|
||||
// using the new schema.
|
||||
|
||||
@@ -34,6 +34,8 @@ static void remove_or_mark_as_unique_owner(partition_version* current, mutation_
|
||||
partition_version::partition_version(partition_version&& pv) noexcept
|
||||
: anchorless_list_base_hook(std::move(pv))
|
||||
, _backref(pv._backref)
|
||||
, _schema(std::move(pv._schema))
|
||||
, _is_being_upgraded(pv._is_being_upgraded)
|
||||
, _partition(std::move(pv._partition))
|
||||
{
|
||||
if (_backref) {
|
||||
@@ -56,15 +58,20 @@ partition_version::~partition_version()
|
||||
if (_backref) {
|
||||
_backref->_version = nullptr;
|
||||
}
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
// Destroying the schema_ptr can cause a destruction of the schema,
|
||||
// so it has to happen in the allocator which schemas are allocated in.
|
||||
_schema = nullptr;
|
||||
});
|
||||
}
|
||||
|
||||
stop_iteration partition_version::clear_gently(cache_tracker* tracker) noexcept {
|
||||
return _partition.clear_gently(tracker);
|
||||
}
|
||||
|
||||
size_t partition_version::size_in_allocator(const schema& s, allocation_strategy& allocator) const {
|
||||
size_t partition_version::size_in_allocator(allocation_strategy& allocator) const {
|
||||
return allocator.object_memory_size_in_allocator(this) +
|
||||
partition().external_memory_usage(s);
|
||||
partition().external_memory_usage(*_schema);
|
||||
}
|
||||
|
||||
namespace {
|
||||
@@ -111,15 +118,20 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduc
|
||||
}
|
||||
|
||||
::static_row partition_snapshot::static_row(bool digest_requested) const {
|
||||
return ::static_row(::squashed<row>(version(),
|
||||
[&] (const mutation_partition_v2& mp) -> const row& {
|
||||
if (digest_requested) {
|
||||
mp.static_row().prepare_hash(*_schema, column_kind::static_column);
|
||||
}
|
||||
return mp.static_row().get();
|
||||
},
|
||||
[this] (const row& r) { return row(*_schema, column_kind::static_column, r); },
|
||||
[this] (row& a, const row& b) { a.apply(*_schema, column_kind::static_column, b); }));
|
||||
const partition_version* this_v = &*version();
|
||||
partition_version* it = this_v->last();
|
||||
if (digest_requested) {
|
||||
it->partition().static_row().prepare_hash(*it->get_schema(), column_kind::static_column);
|
||||
}
|
||||
row r = row::construct(*this_v->get_schema(), *it->get_schema(), column_kind::static_column, it->partition().static_row().get());
|
||||
while (it != this_v) {
|
||||
it = it->prev();
|
||||
if (digest_requested) {
|
||||
it->partition().static_row().prepare_hash(*it->get_schema(), column_kind::static_column);
|
||||
}
|
||||
r.apply(*this_v->get_schema(), *it->get_schema(), column_kind::static_column, it->partition().static_row().get());
|
||||
}
|
||||
return ::static_row(std::move(r));
|
||||
}
|
||||
|
||||
bool partition_snapshot::static_row_continuous() const {
|
||||
@@ -133,15 +145,16 @@ tombstone partition_snapshot::partition_tombstone() const {
|
||||
}
|
||||
|
||||
mutation_partition partition_snapshot::squashed() const {
|
||||
return ::squashed<mutation_partition>(version(),
|
||||
[this] (const mutation_partition_v2& mp) -> mutation_partition {
|
||||
return mp.as_mutation_partition(*_schema);
|
||||
},
|
||||
[] (mutation_partition&& mp) { return std::move(mp); },
|
||||
[this] (mutation_partition& a, const mutation_partition& b) {
|
||||
mutation_application_stats app_stats;
|
||||
a.apply(*_schema, b, *_schema, app_stats);
|
||||
});
|
||||
const partition_version* this_v = &*version();
|
||||
mutation_partition mp(*this_v->get_schema());
|
||||
for (auto it = this_v->last();; it = it->prev()) {
|
||||
mutation_application_stats app_stats;
|
||||
mp.apply(*this_v->get_schema(), it->partition().as_mutation_partition(*it->get_schema()), *it->get_schema(), app_stats);
|
||||
if (it == this_v) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return mp;
|
||||
}
|
||||
|
||||
tombstone partition_entry::partition_tombstone() const {
|
||||
@@ -166,19 +179,43 @@ partition_snapshot::~partition_snapshot() {
|
||||
}
|
||||
|
||||
void merge_versions(const schema& s, mutation_partition_v2& newer, mutation_partition_v2&& older, cache_tracker* tracker, is_evictable evictable) {
|
||||
mutation_application_stats app_stats;
|
||||
older.apply_monotonically(s, std::move(newer), tracker, app_stats, evictable);
|
||||
older.apply(s, std::move(newer), tracker, evictable);
|
||||
newer = std::move(older);
|
||||
}
|
||||
|
||||
// Inserts a new version after pv.
|
||||
// Used only when upgrading the schema of pv.
|
||||
static partition_version& append_version(partition_version& pv, const schema& s, cache_tracker* tracker) {
|
||||
// Every evictable version must have a dummy entry at the end so that
|
||||
// it can be tracked in the LRU. It is also needed to allow old versions
|
||||
// to stay around (with tombstones and static rows) after fully evicted.
|
||||
// Such versions must be fully discontinuous, and thus have a dummy at the end.
|
||||
auto new_version = tracker
|
||||
? current_allocator().construct<partition_version>(mutation_partition_v2::make_incomplete(s), s.shared_from_this())
|
||||
: current_allocator().construct<partition_version>(mutation_partition_v2(s), s.shared_from_this());
|
||||
new_version->partition().set_static_row_continuous(pv.partition().static_row_continuous());
|
||||
new_version->insert_after(pv);
|
||||
if (tracker) {
|
||||
tracker->insert(*new_version);
|
||||
}
|
||||
return *new_version;
|
||||
}
|
||||
|
||||
stop_iteration partition_snapshot::merge_partition_versions(mutation_application_stats& app_stats) {
|
||||
partition_version_ref& v = version();
|
||||
if (!v.is_unique_owner()) {
|
||||
// Shift _version to the oldest unreferenced version and then keep merging left hand side into it.
|
||||
// This is good for performance because in case we were at the latest version
|
||||
// we leave it for incoming writes and they don't have to create a new one.
|
||||
//
|
||||
// If `current->next()` has a different schema than `current`, it will have
|
||||
// to be upgraded before being merged with `current`.
|
||||
// If its upgrade is already in progress, it would be wasteful (though legal)
|
||||
// to initiate its upgrade again, so we stop shifting.
|
||||
//
|
||||
// See the documentation in partition_version.hh for additional info about upgrades.
|
||||
partition_version* current = &*v;
|
||||
while (current->next() && !current->next()->is_referenced()) {
|
||||
while (current->next() && !current->next()->is_referenced() && !current->next()->_is_being_upgraded) {
|
||||
current = current->next();
|
||||
_version = partition_version_ref(*current);
|
||||
_version_merging_state.reset();
|
||||
@@ -190,8 +227,32 @@ stop_iteration partition_snapshot::merge_partition_versions(mutation_application
|
||||
if (!_version_merging_state) {
|
||||
_version_merging_state = apply_resume();
|
||||
}
|
||||
const auto do_stop_iteration = current->partition().apply_monotonically(*schema(),
|
||||
std::move(prev->partition()), _tracker, local_app_stats, is_preemptible::yes, *_version_merging_state,
|
||||
if (!prev->_is_being_upgraded && prev->get_schema()->version() != current->get_schema()->version()) {
|
||||
// The versions we are attempting to merge have different schemas.
|
||||
// In this scenario the older version has to be upgraded before
|
||||
// being merged with the newer one.
|
||||
//
|
||||
// This is done by adding a fresh empty version (with the newer
|
||||
// schema) after `current` and merging `current` into the new
|
||||
// version.
|
||||
//
|
||||
// While the upgrade is happening, `_is_being_upgraded` is set
|
||||
// in the version which is being upgraded, to mark it as having
|
||||
// older schema than its `next()` (and therefore violating the
|
||||
// normal chronological schema order). This is necessary
|
||||
// precisely for the above `if`, so that after resuming a
|
||||
// preempted upgrade we can simply continue, instead of
|
||||
// (illegally) initiating an upgrade of the special fresh
|
||||
// version back to the old schema.
|
||||
//
|
||||
// See the documentation in partition_version.hh for additional info about upgrades.
|
||||
current = &append_version(*current, *prev->get_schema(), _tracker);
|
||||
_version = partition_version_ref(*current);
|
||||
prev = current->prev();
|
||||
prev->_is_being_upgraded = true;
|
||||
}
|
||||
const auto do_stop_iteration = current->partition().apply_monotonically(*current->get_schema(),
|
||||
*prev->get_schema(), std::move(prev->partition()), _tracker, local_app_stats, default_preemption_check(), *_version_merging_state,
|
||||
is_evictable(bool(_tracker)));
|
||||
app_stats.row_hits += local_app_stats.row_hits;
|
||||
if (do_stop_iteration == stop_iteration::no) {
|
||||
@@ -200,6 +261,7 @@ stop_iteration partition_snapshot::merge_partition_versions(mutation_application
|
||||
// If do_stop_iteration is yes, we have to remove the previous version.
|
||||
// It now appears as fully continuous because it is empty.
|
||||
_version_merging_state.reset();
|
||||
prev->_is_being_upgraded = false;
|
||||
if (prev->is_referenced()) {
|
||||
_version.release();
|
||||
prev->back_reference() = partition_version_ref(*current, prev->back_reference().is_unique_owner());
|
||||
@@ -222,7 +284,7 @@ stop_iteration partition_snapshot::slide_to_oldest() noexcept {
|
||||
_entry = nullptr;
|
||||
}
|
||||
partition_version* current = &*v;
|
||||
while (current->next() && !current->next()->is_referenced()) {
|
||||
while (current->next() && !current->next()->is_referenced() && !current->next()->_is_being_upgraded) {
|
||||
current = current->next();
|
||||
_version = partition_version_ref(*current);
|
||||
}
|
||||
@@ -239,18 +301,18 @@ unsigned partition_snapshot::version_count()
|
||||
return count;
|
||||
}
|
||||
|
||||
partition_entry::partition_entry(mutation_partition_v2 mp)
|
||||
partition_entry::partition_entry(const schema& s, mutation_partition_v2 mp)
|
||||
{
|
||||
auto new_version = current_allocator().construct<partition_version>(std::move(mp));
|
||||
auto new_version = current_allocator().construct<partition_version>(std::move(mp), s.shared_from_this());
|
||||
_version = partition_version_ref(*new_version);
|
||||
}
|
||||
|
||||
partition_entry::partition_entry(const schema& s, mutation_partition mp)
|
||||
: partition_entry(mutation_partition_v2(s, std::move(mp)))
|
||||
: partition_entry(s, mutation_partition_v2(s, std::move(mp)))
|
||||
{ }
|
||||
|
||||
partition_entry::partition_entry(partition_entry::evictable_tag, const schema& s, mutation_partition&& mp)
|
||||
: partition_entry([&] {
|
||||
: partition_entry(s, [&] {
|
||||
mp.ensure_last_dummy(s);
|
||||
return mutation_partition_v2(s, std::move(mp));
|
||||
}())
|
||||
@@ -329,8 +391,8 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker*
|
||||
// to stay around (with tombstones and static rows) after fully evicted.
|
||||
// Such versions must be fully discontinuous, and thus have a dummy at the end.
|
||||
auto new_version = tracker
|
||||
? current_allocator().construct<partition_version>(mutation_partition_v2::make_incomplete(s))
|
||||
: current_allocator().construct<partition_version>(mutation_partition_v2(s.shared_from_this()));
|
||||
? current_allocator().construct<partition_version>(mutation_partition_v2::make_incomplete(s), s.shared_from_this())
|
||||
: current_allocator().construct<partition_version>(mutation_partition_v2(s), s.shared_from_this());
|
||||
new_version->partition().set_static_row_continuous(_version->partition().static_row_continuous());
|
||||
new_version->insert_before(*_version);
|
||||
set_version(new_version);
|
||||
@@ -363,24 +425,24 @@ void partition_entry::apply(logalloc::region& r, mutation_cleaner& cleaner, cons
|
||||
if (s.version() != mp_schema.version()) {
|
||||
mp.upgrade(mp_schema, s);
|
||||
}
|
||||
auto new_version = current_allocator().construct<partition_version>(std::move(mp));
|
||||
auto new_version = current_allocator().construct<partition_version>(std::move(mp), s.shared_from_this());
|
||||
partition_snapshot_ptr snp; // Should die after new_version is inserted
|
||||
if (!_snapshot) {
|
||||
try {
|
||||
apply_resume res;
|
||||
auto notify = cleaner.make_region_space_guard();
|
||||
if (_version->partition().apply_monotonically(s,
|
||||
if (_version->partition().apply_monotonically(s, s,
|
||||
std::move(new_version->partition()),
|
||||
no_cache_tracker,
|
||||
app_stats,
|
||||
is_preemptible::yes,
|
||||
default_preemption_check(),
|
||||
res,
|
||||
is_evictable::no) == stop_iteration::yes) {
|
||||
current_allocator().destroy(new_version);
|
||||
return;
|
||||
} else {
|
||||
// Apply was preempted. Let the cleaner finish the job when snapshot dies
|
||||
snp = read(r, cleaner, s.shared_from_this(), no_cache_tracker);
|
||||
snp = read(r, cleaner, no_cache_tracker);
|
||||
// FIXME: Store res in the snapshot as an optimization to resume from where we left off.
|
||||
}
|
||||
} catch (...) {
|
||||
@@ -418,14 +480,14 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
|
||||
// So we cannot allow erasing when preemptible.
|
||||
bool can_move = !preemptible && !pe._snapshot;
|
||||
|
||||
auto src_snp = pe.read(reg, pe_cleaner, s.shared_from_this(), no_cache_tracker);
|
||||
auto src_snp = pe.read(reg, pe_cleaner, no_cache_tracker);
|
||||
partition_snapshot_ptr prev_snp;
|
||||
if (preemptible) {
|
||||
// Reads must see prev_snp until whole update completes so that writes
|
||||
// are not partially visible.
|
||||
prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1);
|
||||
prev_snp = read(reg, tracker.cleaner(), &tracker, phase - 1);
|
||||
}
|
||||
auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
|
||||
auto dst_snp = read(reg, tracker.cleaner(), &tracker, phase);
|
||||
dst_snp->lock();
|
||||
|
||||
// Once we start updating the partition, we must keep all snapshots until the update completes,
|
||||
@@ -547,39 +609,41 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
|
||||
});
|
||||
}
|
||||
|
||||
mutation_partition_v2 partition_entry::squashed(schema_ptr from, schema_ptr to, is_evictable evictable)
|
||||
mutation_partition_v2 partition_entry::squashed_v2(const schema& to, is_evictable evictable)
|
||||
{
|
||||
mutation_partition_v2 mp(to);
|
||||
mp.set_static_row_continuous(_version->partition().static_row_continuous());
|
||||
for (auto&& v : _version->all_elements()) {
|
||||
auto older = mutation_partition_v2(*from, v.partition());
|
||||
if (from->version() != to->version()) {
|
||||
older.upgrade(*from, *to);
|
||||
auto older = mutation_partition_v2(*v.get_schema(), v.partition());
|
||||
if (v.get_schema()->version() != to.version()) {
|
||||
older.upgrade(*v.get_schema(), to);
|
||||
}
|
||||
merge_versions(*to, mp, std::move(older), no_cache_tracker, evictable);
|
||||
merge_versions(to, mp, std::move(older), no_cache_tracker, evictable);
|
||||
}
|
||||
return mp;
|
||||
}
|
||||
|
||||
mutation_partition partition_entry::squashed(const schema& s, is_evictable evictable)
|
||||
{
|
||||
return squashed(s.shared_from_this(), s.shared_from_this(), evictable)
|
||||
.as_mutation_partition(s);
|
||||
return squashed_v2(s, evictable).as_mutation_partition(s);
|
||||
}
|
||||
|
||||
void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner& cleaner, cache_tracker* tracker)
|
||||
void partition_entry::upgrade(logalloc::region& r, schema_ptr to, mutation_cleaner& cleaner, cache_tracker* tracker)
|
||||
{
|
||||
auto new_version = current_allocator().construct<partition_version>(squashed(from, to, is_evictable(bool(tracker))));
|
||||
auto old_version = &*_version;
|
||||
set_version(new_version);
|
||||
if (tracker) {
|
||||
tracker->insert(*new_version);
|
||||
}
|
||||
remove_or_mark_as_unique_owner(old_version, &cleaner);
|
||||
with_allocator(r.allocator(), [&] {
|
||||
auto phase = partition_snapshot::max_phase;
|
||||
if (_snapshot) {
|
||||
phase = _snapshot->_phase;
|
||||
}
|
||||
// The destruction of this snapshot pointer will trigger a background merge
|
||||
// of the old version into the new version.
|
||||
partition_snapshot_ptr snp = read(r, cleaner, tracker, phase);
|
||||
add_version(*to, tracker);
|
||||
});
|
||||
}
|
||||
|
||||
partition_snapshot_ptr partition_entry::read(logalloc::region& r,
|
||||
mutation_cleaner& cleaner, schema_ptr entry_schema, cache_tracker* tracker, partition_snapshot::phase_type phase)
|
||||
mutation_cleaner& cleaner, cache_tracker* tracker, partition_snapshot::phase_type phase)
|
||||
{
|
||||
if (_snapshot) {
|
||||
if (_snapshot->_phase == phase) {
|
||||
@@ -594,12 +658,12 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r,
|
||||
return snp;
|
||||
} else { // phase > _snapshot->_phase
|
||||
with_allocator(r.allocator(), [&] {
|
||||
add_version(*entry_schema, tracker);
|
||||
add_version(*get_schema(), tracker);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, cleaner, this, tracker, phase);
|
||||
auto snp = make_lw_shared<partition_snapshot>(r, cleaner, this, tracker, phase);
|
||||
_snapshot = snp.get();
|
||||
return partition_snapshot_ptr(std::move(snp));
|
||||
}
|
||||
@@ -639,7 +703,7 @@ std::ostream& operator<<(std::ostream& out, const partition_entry::printer& p) {
|
||||
}
|
||||
out << ") ";
|
||||
}
|
||||
out << fmt::ptr(v) << ": " << mutation_partition_v2::printer(p._schema, v->partition());
|
||||
out << fmt::ptr(v) << ": " << mutation_partition_v2::printer(*v->get_schema(), v->partition());
|
||||
v = v->next();
|
||||
first = false;
|
||||
}
|
||||
|
||||
@@ -80,21 +80,7 @@ class static_row;
|
||||
// When the partition_snapshot is destroyed partition_versions are squashed
|
||||
// together to minimize the amount of elements on the list.
|
||||
//
|
||||
// Scene IV. Schema upgrade
|
||||
// pv pv --- pv
|
||||
// ^ ^ ^
|
||||
// | | |
|
||||
// pe ps(u) ps
|
||||
// When there is a schema upgrade the list of partition versions pointed to
|
||||
// by partition_entry is replaced by a new single partition_version that is a
|
||||
// result of squashing and upgrading the old versions.
|
||||
// Old versions not used by any partition snapshot are removed. The first
|
||||
// partition snapshot on the list is marked as unique which means that upon
|
||||
// its destruction it won't attempt to squash versions but instead remove
|
||||
// the unused ones and pass the "unique owner" mark the next snapshot on the
|
||||
// list (if there is any).
|
||||
//
|
||||
// Scene V. partition_entry eviction
|
||||
// Scene IV. partition_entry eviction
|
||||
// pv
|
||||
// ^
|
||||
// |
|
||||
@@ -104,11 +90,110 @@ class static_row;
|
||||
// upgrade scenario. The unused ones are destroyed right away and the first
|
||||
// snapshot on the list is marked as unique owner so that on its destruction
|
||||
// it continues removal of the partition versions.
|
||||
//
|
||||
|
||||
// Schema upgrades
|
||||
//
|
||||
// After a schema change (e.g. a column is removed), the layout of existing
|
||||
// rows in memory becomes outdated and has to be adjusted before they are
|
||||
// emitted by a query expecting the newer schema.
|
||||
//
|
||||
// Rows can be upgraded on the fly during queries. But upgrades have a high CPU
|
||||
// cost, so we want them to happen only once. The upgraded row should be saved
|
||||
// in memory so that future queries don't have to upgrade it again.
|
||||
// And it should replace the old row as soon as possible (when there
|
||||
// the row is no longer reachable through the old schema) to conserve memory.
|
||||
//
|
||||
// This behavior is akin to MVCC. A schema upgrade can be thought of as a
|
||||
// special kind of update which affects all rows, and the MVCC machinery can be
|
||||
// naturally hijacked to implement it.
|
||||
//
|
||||
// Currently, we do it as follows:
|
||||
//
|
||||
// - Each MVCC version has its own schema pointer. Versions in the same chain
|
||||
// can be of different schemas.
|
||||
//
|
||||
// - The schema of a partition entry is defined as the schema of the newest version.
|
||||
// A partition entry upgrade is performed simply by inserting a new empty version with
|
||||
// the new schema. (And triggering a background version merge by creating and immediately
|
||||
// destroying a snapshot pointing at the previous newest version).
|
||||
// Due to this, schemas of versions in the chain are ordered chronologically.
|
||||
// (The order is important because it's forbidden to upgrade to an older version,
|
||||
// because that's lossy -- e.g. a new column can be lost).
|
||||
//
|
||||
// - On read, the cursor upgrades rows on the fly to the cursor's schema.
|
||||
// If the cursor reads the latest version, the upgraded rows are written to the latest
|
||||
// version.
|
||||
//
|
||||
// - When versions are merged, rows are upgraded to the newer schema, the result of the
|
||||
// merge has the newer schema.
|
||||
//
|
||||
// This one is tricky. A natural idea is to merge older versions into the newer version,
|
||||
// (upgrading rows when moving/copying them between versions), so that after a merge
|
||||
// only the new version is left. But usually we want to merge in the other direction.
|
||||
//
|
||||
// (When an database write arrives, we want to merge it into the existing
|
||||
// older version, so that it has a cost proportional to the size of the
|
||||
// write, not to the size of the existing version, which can be arbitrarily
|
||||
// large. Doing otherwise would invite quadratic behaviour)
|
||||
//
|
||||
// The merging algorithm is already very complicated and making it work in both
|
||||
// directions (or adding a separate algorithm specifically for upgrades) would
|
||||
// complicate things even further.
|
||||
//
|
||||
// So instead, when two versions of different schema are merged, the older version
|
||||
// (which also has the older schema) is first upgraded to the newer schema in a special
|
||||
// upgrade process which only uses regular newer-into-older merging.
|
||||
// This is done by appending a fresh empty version with the newer schema after
|
||||
// the version-to-be-upgraded, and merging the version-to-be-upgraded into the new one.
|
||||
// In the end, only the new version with the newer schema is left.
|
||||
//
|
||||
// Technically the above procedure temporarily violates the rule that schema versions
|
||||
// in the chain are ordered chronologically (which is needed for correctness).
|
||||
// So while the above is happening, the version-to-be-upgraded has _is_being_upgraded set.
|
||||
// A version with _is_being_upgraded is understood to be special in that its
|
||||
// schema is older than its next neighbour's, and care is taken so that the
|
||||
// neighbour isn't recursively downgraded back to the older schema.
|
||||
// A version with _is_being_upgraded can be viewed together with its next() as
|
||||
// conceptually a single version with the schema of next().
|
||||
//
|
||||
// The typical upgrade sequence, illustrated:
|
||||
// 1. Initial state:
|
||||
// pv1 (s1)
|
||||
// ^
|
||||
// |
|
||||
// pe
|
||||
// 2. partition_entry::upgrade(s2) is called. Empty pv2 is added.
|
||||
// pv2 (s2) -- pv1 (s1)
|
||||
// ^ ^
|
||||
// | |
|
||||
// pe ps1 (created and instantly dropped, so that merging is initiated)
|
||||
// 3. Some time later, mutation_cleaner calls merge_partition_versions(ps1).
|
||||
// Merge of pv2 and pv1 is attempted.
|
||||
// Schemas differ, so instead an upgrade of pv1 is initiated. Empty pv1' is added.
|
||||
// pv1 is now conceptually "owned" by pv1', and no snapshot is allowed to point to it
|
||||
// after this point.
|
||||
// pv2 (s2) -- pv1 (s1, _is_being_upgraded) -- pv1' (s2)
|
||||
// ^ ^
|
||||
// | |
|
||||
// pe ps1
|
||||
// 4. Eventually pv1 is fully upgrade-merged into pv1' and destroyed.
|
||||
// pv2 (s2) -- pv1' (s2)
|
||||
// ^ ^
|
||||
// | |
|
||||
// pe ps1
|
||||
// 5. Upgrade over, further merge proceeds as usual. Eventually pv2 is fully merged into pv1'.
|
||||
// pv1' (s2)
|
||||
// ^
|
||||
// |
|
||||
// pe
|
||||
|
||||
class partition_version_ref;
|
||||
|
||||
class partition_version : public anchorless_list_base_hook<partition_version> {
|
||||
partition_version_ref* _backref = nullptr;
|
||||
schema_ptr _schema;
|
||||
bool _is_being_upgraded = false;
|
||||
mutation_partition_v2 _partition;
|
||||
|
||||
friend class partition_version_ref;
|
||||
@@ -120,9 +205,18 @@ public:
|
||||
}
|
||||
|
||||
explicit partition_version(schema_ptr s) noexcept
|
||||
: _partition(std::move(s)) { }
|
||||
explicit partition_version(mutation_partition_v2 mp) noexcept
|
||||
: _partition(std::move(mp)) { }
|
||||
: _schema(std::move(s))
|
||||
, _partition(*_schema)
|
||||
{
|
||||
assert(_schema);
|
||||
}
|
||||
explicit partition_version(mutation_partition_v2 mp, schema_ptr s) noexcept
|
||||
: _schema(std::move(s))
|
||||
, _partition(std::move(mp))
|
||||
{
|
||||
assert(_schema);
|
||||
}
|
||||
|
||||
partition_version(partition_version&& pv) noexcept;
|
||||
partition_version& operator=(partition_version&& pv) noexcept;
|
||||
~partition_version();
|
||||
@@ -138,7 +232,9 @@ public:
|
||||
bool is_referenced_from_entry() const;
|
||||
partition_version_ref& back_reference() const { return *_backref; }
|
||||
|
||||
size_t size_in_allocator(const schema& s, allocation_strategy& allocator) const;
|
||||
size_t size_in_allocator(allocation_strategy& allocator) const;
|
||||
|
||||
const schema_ptr& get_schema() const noexcept { return _schema; }
|
||||
};
|
||||
|
||||
using partition_version_range = anchorless_list_base_hook<partition_version>::range;
|
||||
@@ -256,7 +352,6 @@ public:
|
||||
}
|
||||
};
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
// Either _version or _entry is non-null.
|
||||
partition_version_ref _version;
|
||||
partition_entry* _entry;
|
||||
@@ -270,13 +365,12 @@ private:
|
||||
friend class partition_entry;
|
||||
friend class mutation_cleaner_impl;
|
||||
public:
|
||||
explicit partition_snapshot(schema_ptr s,
|
||||
logalloc::region& region,
|
||||
explicit partition_snapshot(logalloc::region& region,
|
||||
mutation_cleaner& cleaner,
|
||||
partition_entry* entry,
|
||||
cache_tracker* tracker, // non-null for evictable snapshots
|
||||
phase_type phase = default_phase)
|
||||
: _schema(std::move(s)), _entry(entry), _phase(phase), _region(®ion), _cleaner(&cleaner), _tracker(tracker) { }
|
||||
: _entry(entry), _phase(phase), _region(®ion), _cleaner(&cleaner), _tracker(tracker) { }
|
||||
partition_snapshot(const partition_snapshot&) = delete;
|
||||
partition_snapshot(partition_snapshot&&) = delete;
|
||||
partition_snapshot& operator=(const partition_snapshot&) = delete;
|
||||
@@ -358,7 +452,7 @@ public:
|
||||
return !version()->next();
|
||||
}
|
||||
|
||||
const schema_ptr& schema() const { return _schema; }
|
||||
const schema_ptr& schema() const { return version()->get_schema(); }
|
||||
logalloc::region& region() const { return *_region; }
|
||||
cache_tracker* tracker() const { return _tracker; }
|
||||
mutation_cleaner& cleaner() { return *_cleaner; }
|
||||
@@ -439,7 +533,7 @@ public:
|
||||
// Constructs a non-evictable entry holding empty partition
|
||||
partition_entry() = default;
|
||||
// Constructs a non-evictable entry
|
||||
explicit partition_entry(mutation_partition_v2);
|
||||
partition_entry(const schema&, mutation_partition_v2);
|
||||
partition_entry(const schema&, mutation_partition);
|
||||
// Returns a reference to partition_entry containing given pv,
|
||||
// assuming pv.is_referenced_from_entry().
|
||||
@@ -523,24 +617,28 @@ public:
|
||||
const schema& mp_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
|
||||
// Adds mutation_partition represented by "other" to the one represented
|
||||
// Adds mutation_partition represented by "pe" to the one represented
|
||||
// by this entry.
|
||||
// This entry must be evictable.
|
||||
// "pe" must be fully-continuous.
|
||||
// (Alternatively: applies the "pe" memtable entry to "this" cache entry.)
|
||||
//
|
||||
// The argument must be fully-continuous.
|
||||
//
|
||||
// The continuity of this entry remains unchanged. Information from "other"
|
||||
// The continuity of this entry remains unchanged. Information from "pe"
|
||||
// which is incomplete in this instance is dropped. In other words, this
|
||||
// performs set intersection on continuity information, drops information
|
||||
// which falls outside of the continuity range, and applies regular merging
|
||||
// rules for the rest.
|
||||
// (Rationale: updates from the memtable are only applied to intervals
|
||||
// which were already in cache. The cache treats the entire sstable set as a
|
||||
// single source -- it isn't able to store partial information only from a
|
||||
// single sstable.)
|
||||
//
|
||||
// Weak exception guarantees.
|
||||
// If an exception is thrown this and pe will be left in some valid states
|
||||
// If an exception is thrown, "this" and "pe" will be left in some valid states
|
||||
// such that if the operation is retried (possibly many times) and eventually
|
||||
// succeeds the result will be as if the first attempt didn't fail.
|
||||
//
|
||||
// The schema of pe must conform to s.
|
||||
// The schema of "pe" must conform to "s".
|
||||
//
|
||||
// Returns a coroutine object representing the operation.
|
||||
// The coroutine must be resumed with the region being unlocked.
|
||||
@@ -581,27 +679,27 @@ public:
|
||||
return *_version;
|
||||
}
|
||||
|
||||
mutation_partition_v2 squashed(schema_ptr from, schema_ptr to, is_evictable);
|
||||
mutation_partition_v2 squashed_v2(const schema& to, is_evictable);
|
||||
mutation_partition squashed(const schema&, is_evictable);
|
||||
tombstone partition_tombstone() const;
|
||||
|
||||
// needs to be called with reclaiming disabled
|
||||
// Must not be called when is_locked().
|
||||
void upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&, cache_tracker*);
|
||||
void upgrade(logalloc::region& r, schema_ptr to, mutation_cleaner&, cache_tracker*);
|
||||
|
||||
const schema_ptr& get_schema() const noexcept { return _version->get_schema(); }
|
||||
|
||||
// Snapshots with different values of phase will point to different partition_version objects.
|
||||
// When is_locked(), read() can only be called with a phase which is <= the phase of the current snapshot.
|
||||
partition_snapshot_ptr read(logalloc::region& region,
|
||||
mutation_cleaner&,
|
||||
schema_ptr entry_schema,
|
||||
cache_tracker*,
|
||||
partition_snapshot::phase_type phase = partition_snapshot::default_phase);
|
||||
|
||||
class printer {
|
||||
const schema& _schema;
|
||||
const partition_entry& _partition_entry;
|
||||
public:
|
||||
printer(const schema& s, const partition_entry& pe) : _schema(s), _partition_entry(pe) { }
|
||||
printer(const partition_entry& pe) : _partition_entry(pe) { }
|
||||
printer(const printer&) = delete;
|
||||
printer(printer&&) = delete;
|
||||
|
||||
|
||||
@@ -111,6 +111,7 @@ class partition_snapshot_row_cursor final {
|
||||
mutation_partition::rows_type::iterator it;
|
||||
utils::immutable_collection<mutation_partition::rows_type> rows;
|
||||
int version_no;
|
||||
const schema* schema;
|
||||
bool unique_owner = false;
|
||||
is_continuous continuous = is_continuous::no; // Range continuity in the direction of lower keys (in cursor schema domain).
|
||||
|
||||
@@ -202,7 +203,7 @@ class partition_snapshot_row_cursor final {
|
||||
position_in_version& v = _heap.back();
|
||||
rows_entry& e = *v.it;
|
||||
if (_digest_requested) {
|
||||
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
|
||||
e.row().cells().prepare_hash(*v.schema, column_kind::regular_column);
|
||||
}
|
||||
_dummy &= bool(e.dummy());
|
||||
_continuous |= bool(v.continuous);
|
||||
@@ -280,7 +281,7 @@ class partition_snapshot_row_cursor final {
|
||||
rt = pos->range_tombstone();
|
||||
}
|
||||
if (pos) [[likely]] {
|
||||
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, cont, rt});
|
||||
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, cont, rt});
|
||||
}
|
||||
} else {
|
||||
if (_reversed) [[unlikely]] {
|
||||
@@ -293,7 +294,7 @@ class partition_snapshot_row_cursor final {
|
||||
_background_continuity = true; // Default continuity past the last entry
|
||||
}
|
||||
if (pos) [[likely]] {
|
||||
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, is_continuous::yes});
|
||||
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, is_continuous::yes});
|
||||
}
|
||||
}
|
||||
++version_no;
|
||||
@@ -469,7 +470,7 @@ public:
|
||||
}
|
||||
} else if (match) {
|
||||
_current_row.insert(_current_row.begin(), position_in_version{
|
||||
it, std::move(rows), 0, _unique_owner, cont, rt});
|
||||
it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt});
|
||||
if (heap_i != _heap.end()) {
|
||||
_heap.erase(heap_i);
|
||||
boost::range::make_heap(_heap, heap_less);
|
||||
@@ -482,7 +483,7 @@ public:
|
||||
boost::range::make_heap(_heap, heap_less);
|
||||
} else {
|
||||
_heap.push_back(position_in_version{
|
||||
it, std::move(rows), 0, _unique_owner, cont, rt});
|
||||
it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt});
|
||||
boost::range::push_heap(_heap, heap_less);
|
||||
}
|
||||
}
|
||||
@@ -556,9 +557,9 @@ public:
|
||||
clustering_row row() const {
|
||||
// Note: if the precondition ("cursor is valid and pointing at a row") is fulfilled
|
||||
// then _current_row is not empty, so the below is valid.
|
||||
clustering_row cr(key(), deletable_row(_schema, _current_row[0].it->row()));
|
||||
clustering_row cr(key(), deletable_row(_schema, *_current_row[0].schema, _current_row[0].it->row()));
|
||||
for (size_t i = 1; i < _current_row.size(); ++i) {
|
||||
cr.apply(_schema, _current_row[i].it->row());
|
||||
cr.apply(_schema, *_current_row[i].schema, _current_row[i].it->row());
|
||||
}
|
||||
return cr;
|
||||
}
|
||||
@@ -571,32 +572,23 @@ public:
|
||||
// Can be called only when cursor is valid and pointing at a row.
|
||||
// Monotonic exception guarantees.
|
||||
template <typename Consumer>
|
||||
requires std::is_invocable_v<Consumer, deletable_row>
|
||||
requires std::is_invocable_v<Consumer, deletable_row&&>
|
||||
void consume_row(Consumer&& consumer) {
|
||||
for (position_in_version& v : _current_row) {
|
||||
if (v.unique_owner) {
|
||||
if (v.unique_owner && (_schema.version() == v.schema->version())) [[likely]] {
|
||||
consumer(std::move(v.it->row()));
|
||||
} else {
|
||||
consumer(deletable_row(_schema, v.it->row()));
|
||||
consumer(deletable_row(_schema, *v.schema, v.it->row()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Can be called only when cursor is valid and pointing at a row.
|
||||
template <typename Consumer>
|
||||
requires std::is_invocable_v<Consumer, const deletable_row&>
|
||||
void consume_row(Consumer&& consumer) const {
|
||||
for (const position_in_version& v : _current_row) {
|
||||
consumer(v.it->row());
|
||||
}
|
||||
}
|
||||
|
||||
// Returns memory footprint of row entries under the cursor.
|
||||
// Can be called only when cursor is valid and pointing at a row.
|
||||
size_t memory_usage() const {
|
||||
size_t result = 0;
|
||||
for (const position_in_version& v : _current_row) {
|
||||
result += v.it->memory_usage(_schema);
|
||||
result += v.it->memory_usage(*v.schema);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@@ -631,7 +623,7 @@ public:
|
||||
is_dummy(!_position.is_clustering_row()), is_continuous::no));
|
||||
} else {
|
||||
return alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(*_snp.schema(), *_current_row[0].it));
|
||||
current_allocator().construct<rows_entry>(*_snp.schema(), *_current_row[0].schema, *_current_row[0].it));
|
||||
}
|
||||
}();
|
||||
rows_entry& re = *e;
|
||||
@@ -707,7 +699,7 @@ public:
|
||||
}
|
||||
}
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(_schema, pos,
|
||||
current_allocator().construct<rows_entry>(*_snp.version()->get_schema(), pos,
|
||||
is_dummy(!pos.is_clustering_row()),
|
||||
is_continuous::no));
|
||||
if (latest_i && latest_i->continuous()) {
|
||||
|
||||
@@ -225,7 +225,7 @@ memtable::find_or_create_partition(const dht::decorated_key& key) {
|
||||
if (i == partitions.end() || !hint.match) {
|
||||
partitions_type::iterator entry = partitions.emplace_before(i,
|
||||
key.token().raw(), hint,
|
||||
_schema, dht::decorated_key(key), mutation_partition(_schema));
|
||||
_schema, dht::decorated_key(key), mutation_partition(*_schema));
|
||||
++nr_partitions;
|
||||
++_table_stats.memtable_partition_insertions;
|
||||
if (!hint.emplace_keeps_iterators()) {
|
||||
@@ -696,7 +696,7 @@ public:
|
||||
};
|
||||
|
||||
partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) {
|
||||
return _pe.read(mtbl.region(), mtbl.cleaner(), _schema, no_cache_tracker);
|
||||
return _pe.read(mtbl.region(), mtbl.cleaner(), no_cache_tracker);
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2_opt
|
||||
@@ -793,7 +793,7 @@ memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_han
|
||||
with_allocator(allocator(), [this, &m, &m_schema] {
|
||||
_allocating_section(*this, [&, this] {
|
||||
auto& p = find_or_create_partition_slow(m.key());
|
||||
mutation_partition mp(m_schema);
|
||||
mutation_partition mp(*m_schema);
|
||||
partition_builder pb(*m_schema, mp);
|
||||
m.partition().accept(*m_schema, pb);
|
||||
_stats_collector.update(*m_schema, mp);
|
||||
@@ -821,8 +821,7 @@ mutation_source memtable::as_data_source() {
|
||||
}
|
||||
|
||||
memtable_entry::memtable_entry(memtable_entry&& o) noexcept
|
||||
: _schema(std::move(o._schema))
|
||||
, _key(std::move(o._key))
|
||||
: _key(std::move(o._key))
|
||||
, _pe(std::move(o._pe))
|
||||
, _flags(o._flags)
|
||||
{ }
|
||||
@@ -839,19 +838,16 @@ bool memtable::is_flushed() const noexcept {
|
||||
return bool(_underlying);
|
||||
}
|
||||
|
||||
void memtable_entry::upgrade_schema(const schema_ptr& s, mutation_cleaner& cleaner) {
|
||||
if (_schema != s) {
|
||||
partition().upgrade(_schema, s, cleaner, no_cache_tracker);
|
||||
_schema = s;
|
||||
void memtable_entry::upgrade_schema(logalloc::region& r, const schema_ptr& s, mutation_cleaner& cleaner) {
|
||||
if (schema() != s) {
|
||||
partition().upgrade(r, s, cleaner, no_cache_tracker);
|
||||
}
|
||||
}
|
||||
|
||||
void memtable::upgrade_entry(memtable_entry& e) {
|
||||
if (e._schema != _schema) {
|
||||
if (e.schema() != _schema) {
|
||||
assert(!reclaiming_enabled());
|
||||
with_allocator(allocator(), [this, &e] {
|
||||
e.upgrade_schema(_schema, cleaner());
|
||||
});
|
||||
e.upgrade_schema(region(), _schema, cleaner());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -870,7 +866,7 @@ std::ostream& operator<<(std::ostream& out, memtable& mt) {
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const memtable_entry& mt) {
|
||||
return out << "{" << mt.key() << ": " << partition_entry::printer(*mt.schema(), mt.partition()) << "}";
|
||||
return out << "{" << mt.key() << ": " << partition_entry::printer(mt.partition()) << "}";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -33,7 +33,6 @@ namespace bi = boost::intrusive;
|
||||
namespace replica {
|
||||
|
||||
class memtable_entry {
|
||||
schema_ptr _schema;
|
||||
dht::decorated_key _key;
|
||||
partition_entry _pe;
|
||||
struct {
|
||||
@@ -52,9 +51,8 @@ public:
|
||||
friend class memtable;
|
||||
|
||||
memtable_entry(schema_ptr s, dht::decorated_key key, mutation_partition p)
|
||||
: _schema(std::move(s))
|
||||
, _key(std::move(key))
|
||||
, _pe(*_schema, std::move(p))
|
||||
: _key(std::move(key))
|
||||
, _pe(*s, std::move(p))
|
||||
{ }
|
||||
|
||||
memtable_entry(memtable_entry&& o) noexcept;
|
||||
@@ -65,13 +63,12 @@ public:
|
||||
dht::decorated_key& key() { return _key; }
|
||||
const partition_entry& partition() const { return _pe; }
|
||||
partition_entry& partition() { return _pe; }
|
||||
const schema_ptr& schema() const { return _schema; }
|
||||
schema_ptr& schema() { return _schema; }
|
||||
const schema_ptr& schema() const { return _pe.get_schema(); }
|
||||
partition_snapshot_ptr snapshot(memtable& mtbl);
|
||||
|
||||
// Makes the entry conform to given schema.
|
||||
// Must be called under allocating section of the region which owns the entry.
|
||||
void upgrade_schema(const schema_ptr&, mutation_cleaner&);
|
||||
void upgrade_schema(logalloc::region&, const schema_ptr&, mutation_cleaner&);
|
||||
|
||||
size_t external_memory_usage_without_rows() const {
|
||||
return _key.key().external_memory_usage();
|
||||
@@ -86,7 +83,7 @@ public:
|
||||
size_t size_in_allocator(allocation_strategy& allocator) {
|
||||
auto size = size_in_allocator_without_rows(allocator);
|
||||
for (auto&& v : _pe.versions()) {
|
||||
size += v.size_in_allocator(*_schema, allocator);
|
||||
size += v.size_in_allocator(allocator);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
34
row_cache.cc
34
row_cache.cc
@@ -846,7 +846,7 @@ cache_entry& row_cache::find_or_create_incomplete(const partition_start& ps, row
|
||||
|
||||
cache_entry& row_cache::find_or_create_missing(const dht::decorated_key& key) {
|
||||
return do_find_or_create_entry(key, nullptr, [&] (auto i, const partitions_type::bound_hint& hint) {
|
||||
mutation_partition mp(_schema);
|
||||
mutation_partition mp(*_schema);
|
||||
bool cont = i->continuous();
|
||||
partitions_type::iterator entry = _partitions.emplace_before(i, key.token().raw(), hint,
|
||||
_schema, key, std::move(mp));
|
||||
@@ -1024,9 +1024,9 @@ future<> row_cache::update(external_updater eu, replica::memtable& m) {
|
||||
if (cache_i != partitions_end() && hint.match) {
|
||||
cache_entry& entry = *cache_i;
|
||||
upgrade_entry(entry);
|
||||
assert(entry._schema == _schema);
|
||||
assert(entry.schema() == _schema);
|
||||
_tracker.on_partition_merge();
|
||||
mem_e.upgrade_schema(_schema, _tracker.memtable_cleaner());
|
||||
mem_e.upgrade_schema(_tracker.region(), _schema, _tracker.memtable_cleaner());
|
||||
return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), _tracker.memtable_cleaner(),
|
||||
alloc, _tracker.region(), _tracker, _underlying_phase, acc);
|
||||
} else if (cache_i->continuous()
|
||||
@@ -1035,10 +1035,10 @@ future<> row_cache::update(external_updater eu, replica::memtable& m) {
|
||||
// Partition is absent in underlying. First, insert a neutral partition entry.
|
||||
partitions_type::iterator entry = _partitions.emplace_before(cache_i, mem_e.key().token().raw(), hint,
|
||||
cache_entry::evictable_tag(), _schema, dht::decorated_key(mem_e.key()),
|
||||
partition_entry::make_evictable(*_schema, mutation_partition(_schema)));
|
||||
partition_entry::make_evictable(*_schema, mutation_partition(*_schema)));
|
||||
entry->set_continuous(cache_i->continuous());
|
||||
_tracker.insert(*entry);
|
||||
mem_e.upgrade_schema(_schema, _tracker.memtable_cleaner());
|
||||
mem_e.upgrade_schema(_tracker.region(), _schema, _tracker.memtable_cleaner());
|
||||
return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), _tracker.memtable_cleaner(),
|
||||
alloc, _tracker.region(), _tracker, _underlying_phase, acc);
|
||||
} else {
|
||||
@@ -1194,8 +1194,7 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker,
|
||||
}
|
||||
|
||||
cache_entry::cache_entry(cache_entry&& o) noexcept
|
||||
: _schema(std::move(o._schema))
|
||||
, _key(std::move(o._key))
|
||||
: _key(std::move(o._key))
|
||||
, _pe(std::move(o._pe))
|
||||
, _flags(o._flags)
|
||||
{
|
||||
@@ -1296,9 +1295,9 @@ flat_mutation_reader_v2 cache_entry::read(row_cache& rc, std::unique_ptr<read_co
|
||||
|
||||
// Assumes reader is in the corresponding partition
|
||||
flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, read_context& reader) {
|
||||
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, reader.phase());
|
||||
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, reader.native_slice(), _key.key());
|
||||
schema_ptr entry_schema = to_query_domain(reader.slice(), _schema);
|
||||
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), &rc._tracker, reader.phase());
|
||||
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*schema(), reader.native_slice(), _key.key());
|
||||
schema_ptr entry_schema = to_query_domain(reader.slice(), schema());
|
||||
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, reader, std::move(snp));
|
||||
r.upgrade_schema(to_query_domain(reader.slice(), rc.schema()));
|
||||
r.upgrade_schema(reader.schema());
|
||||
@@ -1306,10 +1305,10 @@ flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, read_context& reader
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, std::unique_ptr<read_context> unique_ctx) {
|
||||
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, unique_ctx->phase());
|
||||
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, unique_ctx->native_slice(), _key.key());
|
||||
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), &rc._tracker, unique_ctx->phase());
|
||||
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*schema(), unique_ctx->native_slice(), _key.key());
|
||||
schema_ptr reader_schema = unique_ctx->schema();
|
||||
schema_ptr entry_schema = to_query_domain(unique_ctx->slice(), _schema);
|
||||
schema_ptr entry_schema = to_query_domain(unique_ctx->slice(), schema());
|
||||
auto rc_schema = to_query_domain(unique_ctx->slice(), rc.schema());
|
||||
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, std::move(unique_ctx), std::move(snp));
|
||||
r.upgrade_schema(rc_schema);
|
||||
@@ -1322,13 +1321,10 @@ const schema_ptr& row_cache::schema() const {
|
||||
}
|
||||
|
||||
void row_cache::upgrade_entry(cache_entry& e) {
|
||||
if (e._schema != _schema && !e.partition().is_locked()) {
|
||||
if (e.schema() != _schema && !e.partition().is_locked()) {
|
||||
auto& r = _tracker.region();
|
||||
assert(!r.reclaiming_enabled());
|
||||
with_allocator(r.allocator(), [this, &e] {
|
||||
e.partition().upgrade(e._schema, _schema, _tracker.cleaner(), &_tracker);
|
||||
e._schema = _schema;
|
||||
});
|
||||
e.partition().upgrade(r, _schema, _tracker.cleaner(), &_tracker);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1371,6 +1367,6 @@ std::ostream& operator<<(std::ostream& out, const cache_entry& e) {
|
||||
return out << "{cache_entry: " << e.position()
|
||||
<< ", cont=" << e.continuous()
|
||||
<< ", dummy=" << e.is_dummy_entry()
|
||||
<< ", " << partition_entry::printer(*e.schema(), e.partition())
|
||||
<< ", " << partition_entry::printer(e.partition())
|
||||
<< "}";
|
||||
}
|
||||
|
||||
12
row_cache.hh
12
row_cache.hh
@@ -50,7 +50,6 @@ class lsa_manager;
|
||||
//
|
||||
// TODO: Make memtables use this format too.
|
||||
class cache_entry {
|
||||
schema_ptr _schema;
|
||||
dht::decorated_key _key;
|
||||
partition_entry _pe;
|
||||
// True when we know that there is nothing between this entry and the previous one in cache
|
||||
@@ -86,9 +85,8 @@ public:
|
||||
}
|
||||
|
||||
cache_entry(schema_ptr s, const dht::decorated_key& key, const mutation_partition& p)
|
||||
: _schema(std::move(s))
|
||||
, _key(key)
|
||||
, _pe(partition_entry::make_evictable(*_schema, mutation_partition(*_schema, p)))
|
||||
: _key(key)
|
||||
, _pe(partition_entry::make_evictable(*s, mutation_partition(*s, p)))
|
||||
{ }
|
||||
|
||||
cache_entry(schema_ptr s, dht::decorated_key&& key, mutation_partition&& p)
|
||||
@@ -99,8 +97,7 @@ public:
|
||||
// It is assumed that pe is fully continuous
|
||||
// pe must be evictable.
|
||||
cache_entry(evictable_tag, schema_ptr s, dht::decorated_key&& key, partition_entry&& pe) noexcept
|
||||
: _schema(std::move(s))
|
||||
, _key(std::move(key))
|
||||
: _key(std::move(key))
|
||||
, _pe(std::move(pe))
|
||||
{ }
|
||||
|
||||
@@ -130,8 +127,7 @@ public:
|
||||
|
||||
const partition_entry& partition() const noexcept { return _pe; }
|
||||
partition_entry& partition() { return _pe; }
|
||||
const schema_ptr& schema() const noexcept { return _schema; }
|
||||
schema_ptr& schema() noexcept { return _schema; }
|
||||
const schema_ptr& schema() const noexcept { return _pe.get_schema(); }
|
||||
flat_mutation_reader_v2 read(row_cache&, cache::read_context&);
|
||||
flat_mutation_reader_v2 read(row_cache&, std::unique_ptr<cache::read_context>);
|
||||
flat_mutation_reader_v2 read(row_cache&, cache::read_context&, utils::phased_barrier::phase_type);
|
||||
|
||||
@@ -4509,7 +4509,7 @@ public:
|
||||
const mutation& m = z.get<1>().mut;
|
||||
for (const version& v : z.get<0>()) {
|
||||
auto diff = v.par
|
||||
? m.partition().difference(schema, (co_await v.par->mut().unfreeze_gently(schema)).partition())
|
||||
? m.partition().difference(*schema, (co_await v.par->mut().unfreeze_gently(schema)).partition())
|
||||
: mutation_partition(*schema, m.partition());
|
||||
std::optional<mutation> mdiff;
|
||||
if (!diff.empty()) {
|
||||
|
||||
@@ -190,7 +190,7 @@ public:
|
||||
static partition_snapshot_ptr snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) {
|
||||
return rc._read_section(rc._tracker.region(), [&] {
|
||||
cache_entry& e = rc.lookup(dk);
|
||||
return e.partition().read(rc._tracker.region(), rc._tracker.cleaner(), e.schema(), &rc._tracker);
|
||||
return e.partition().read(rc._tracker.region(), rc._tracker.cleaner(), &rc._tracker);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -270,7 +270,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
|
||||
|
||||
// Difference
|
||||
|
||||
m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m2.partition()));
|
||||
m = mutation(s, m1.decorated_key(), m1.partition().difference(*s, m2.partition()));
|
||||
ac = get_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
{
|
||||
@@ -287,7 +287,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
|
||||
verify_shard_order(ccv);
|
||||
}
|
||||
|
||||
m = mutation(s, m1.decorated_key(), m2.partition().difference(s, m1.partition()));
|
||||
m = mutation(s, m1.decorated_key(), m2.partition().difference(*s, m1.partition()));
|
||||
ac = get_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
{
|
||||
@@ -304,11 +304,11 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
|
||||
verify_shard_order(ccv);
|
||||
}
|
||||
|
||||
m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m3.partition()));
|
||||
m = mutation(s, m1.decorated_key(), m1.partition().difference(*s, m3.partition()));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0);
|
||||
BOOST_REQUIRE(m.partition().static_row().empty());
|
||||
|
||||
m = mutation(s, m1.decorated_key(), m3.partition().difference(s, m1.partition()));
|
||||
m = mutation(s, m1.decorated_key(), m3.partition().difference(*s, m1.partition()));
|
||||
ac = get_counter_cell(m);
|
||||
BOOST_REQUIRE(!ac.is_live());
|
||||
|
||||
|
||||
@@ -1103,7 +1103,7 @@ SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) {
|
||||
m2 = mutation_partition_v2(s, second.partition());
|
||||
});
|
||||
auto check = defer([&] {
|
||||
m.apply_monotonically(s, std::move(m2), no_cache_tracker, app_stats, is_evictable::no);
|
||||
m.apply(s, std::move(m2));
|
||||
assert_that(target.schema(), m).is_equal_to_compacted(expected.partition());
|
||||
});
|
||||
auto continuity_check = defer([&] {
|
||||
@@ -1120,7 +1120,7 @@ SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) {
|
||||
}
|
||||
});
|
||||
apply_resume res;
|
||||
if (m.apply_monotonically(s, std::move(m2), no_cache_tracker, app_stats, preempt_check, res, is_evictable::yes) == stop_iteration::yes) {
|
||||
if (m.apply_monotonically(s, s, std::move(m2), no_cache_tracker, app_stats, preempt_check, res, is_evictable::yes) == stop_iteration::yes) {
|
||||
continuity_check.cancel();
|
||||
seastar::memory::local_failure_injector().cancel();
|
||||
}
|
||||
@@ -1195,7 +1195,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
|
||||
m12.apply(m1);
|
||||
m12.apply(m2);
|
||||
|
||||
auto m2_1 = m2.partition().difference(s, m1.partition());
|
||||
auto m2_1 = m2.partition().difference(*s, m1.partition());
|
||||
BOOST_REQUIRE_EQUAL(m2_1.partition_tombstone(), tombstone());
|
||||
BOOST_REQUIRE(!m2_1.static_row().size());
|
||||
BOOST_REQUIRE(!m2_1.find_row(*s, ckey1));
|
||||
@@ -1212,7 +1212,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
|
||||
m12_1.partition().apply(*s, m2_1, *s, app_stats);
|
||||
BOOST_REQUIRE_EQUAL(m12, m12_1);
|
||||
|
||||
auto m1_2 = m1.partition().difference(s, m2.partition());
|
||||
auto m1_2 = m1.partition().difference(*s, m2.partition());
|
||||
BOOST_REQUIRE_EQUAL(m1_2.partition_tombstone(), m12.partition().partition_tombstone());
|
||||
BOOST_REQUIRE(m1_2.find_row(*s, ckey1));
|
||||
BOOST_REQUIRE(m1_2.find_row(*s, ckey2));
|
||||
@@ -1230,10 +1230,10 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
|
||||
m12_2.partition().apply(*s, m1_2, *s, app_stats);
|
||||
BOOST_REQUIRE_EQUAL(m12, m12_2);
|
||||
|
||||
auto m3_12 = m3.partition().difference(s, m12.partition());
|
||||
auto m3_12 = m3.partition().difference(*s, m12.partition());
|
||||
BOOST_REQUIRE(m3_12.empty());
|
||||
|
||||
auto m12_3 = m12.partition().difference(s, m3.partition());
|
||||
auto m12_3 = m12.partition().difference(*s, m3.partition());
|
||||
BOOST_REQUIRE_EQUAL(m12_3.partition_tombstone(), m12.partition().partition_tombstone());
|
||||
|
||||
mutation m123(s, partition_key::from_single_value(*s, "key1"));
|
||||
@@ -1364,13 +1364,13 @@ SEASTAR_TEST_CASE(test_query_digest) {
|
||||
|
||||
auto m3 = m2;
|
||||
{
|
||||
auto diff = m1.partition().difference(s, m2.partition());
|
||||
auto diff = m1.partition().difference(*s, m2.partition());
|
||||
m3.partition().apply(*m3.schema(), std::move(diff), app_stats);
|
||||
}
|
||||
|
||||
auto m4 = m1;
|
||||
{
|
||||
auto diff = m2.partition().difference(s, m1.partition());
|
||||
auto diff = m2.partition().difference(*s, m1.partition());
|
||||
m4.partition().apply(*m4.schema(), std::move(diff), app_stats);
|
||||
}
|
||||
|
||||
@@ -1881,7 +1881,7 @@ SEASTAR_TEST_CASE(test_collection_cell_diff) {
|
||||
mutation m12 = m1;
|
||||
m12.apply(m2);
|
||||
|
||||
auto diff = m12.partition().difference(s, m1.partition());
|
||||
auto diff = m12.partition().difference(*s, m1.partition());
|
||||
BOOST_REQUIRE(!diff.empty());
|
||||
BOOST_REQUIRE(m2.partition().equal(*s, diff));
|
||||
});
|
||||
@@ -1919,11 +1919,11 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
|
||||
auto m12 = m1;
|
||||
m12.apply(m2);
|
||||
auto m12_with_diff = m1;
|
||||
m12_with_diff.partition().apply(*s, m2.partition().difference(s, m1.partition()), app_stats);
|
||||
m12_with_diff.partition().apply(*s, m2.partition().difference(*s, m1.partition()), app_stats);
|
||||
check_partitions_match(m12.partition(), m12_with_diff.partition(), *s);
|
||||
check_partitions_match(mutation_partition{s}, m1.partition().difference(s, m1.partition()), *s);
|
||||
check_partitions_match(m1.partition(), m1.partition().difference(s, mutation_partition{s}), *s);
|
||||
check_partitions_match(mutation_partition{s}, mutation_partition{s}.difference(s, m1.partition()), *s);
|
||||
check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s);
|
||||
check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s);
|
||||
check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2102,7 +2102,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_non_evictable_snapshot) {
|
||||
auto to_apply = mutation_partition_v2(s, m2_v2);
|
||||
|
||||
apply_resume res;
|
||||
while (result_v2.apply_monotonically(s, std::move(to_apply), no_cache_tracker, app_stats,
|
||||
while (result_v2.apply_monotonically(s, s, std::move(to_apply), no_cache_tracker, app_stats,
|
||||
[&] () noexcept { return preempt(); }, res, is_evictable::no) == stop_iteration::no) {
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
@@ -2132,7 +2132,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_non_evictable_snapshot) {
|
||||
|
||||
static void clear(cache_tracker& tracker, const schema& s, mutation_partition_v2& p) {
|
||||
while (p.clear_gently(&tracker) == stop_iteration::no) {}
|
||||
p = mutation_partition_v2(s.shared_from_this());
|
||||
p = mutation_partition_v2(s);
|
||||
tracker.insert(p);
|
||||
}
|
||||
|
||||
@@ -2200,7 +2200,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_evictable_snapshot) {
|
||||
clear(tracker, s, result_v2);
|
||||
});
|
||||
apply_resume res;
|
||||
while (result_v2.apply_monotonically(s, std::move(m2_v2), &tracker, app_stats, is_preemptible::yes, res,
|
||||
while (result_v2.apply_monotonically(s, s, std::move(m2_v2), &tracker, app_stats, default_preemption_check(), res,
|
||||
is_evictable::yes) == stop_iteration::no) {
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
@@ -2285,7 +2285,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_past_last_entry_in_evictable) {
|
||||
|
||||
mutation_application_stats app_stats;
|
||||
apply_resume resume;
|
||||
m1_v2.apply_monotonically(s, std::move(m2_v2), nullptr, app_stats, is_preemptible::no, resume,
|
||||
m1_v2.apply_monotonically(s, s, std::move(m2_v2), nullptr, app_stats, never_preempt(), resume,
|
||||
is_evictable::yes);
|
||||
|
||||
BOOST_REQUIRE(m1_v2.is_fully_continuous());
|
||||
@@ -2306,7 +2306,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_past_last_entry_in_evictable) {
|
||||
// m2_v2: --------------- 5 ==(rt)== 7 [rt] ---
|
||||
// m1_v2: === 1 === 3 =========================
|
||||
|
||||
m1_v2.apply_monotonically(s, std::move(m2_v2), nullptr, app_stats, is_preemptible::no, resume, is_evictable::yes);
|
||||
m1_v2.apply_monotonically(s, s, std::move(m2_v2), nullptr, app_stats, never_preempt(), resume, is_evictable::yes);
|
||||
|
||||
BOOST_REQUIRE(m1_v2.is_fully_continuous());
|
||||
assert_that(ss.schema(), m1_v2).is_equal_to_compacted(s, (m1 + m2).partition());
|
||||
|
||||
@@ -41,7 +41,7 @@ static thread_local mutation_application_stats app_stats_for_tests;
|
||||
// The cursor must be pointing at a row and valid.
|
||||
// The cursor will not be pointing at a row after this.
|
||||
static mutation_partition read_partition_from(const schema& schema, partition_snapshot_row_cursor& cur) {
|
||||
mutation_partition p(schema.shared_from_this());
|
||||
mutation_partition p(schema);
|
||||
position_in_partition prev = position_in_partition::before_all_clustered_rows();
|
||||
do {
|
||||
testlog.trace("cur: {}", cur);
|
||||
@@ -170,14 +170,14 @@ public:
|
||||
|
||||
void upgrade(schema_ptr new_schema) {
|
||||
_container.allocate_in_region([&] {
|
||||
_e.upgrade(_s, new_schema, _container.cleaner(), _container.tracker());
|
||||
_e.upgrade(_container.region(), new_schema, _container.cleaner(), _container.tracker());
|
||||
_s = new_schema;
|
||||
});
|
||||
}
|
||||
|
||||
partition_snapshot_ptr read() {
|
||||
return _container.allocate_in_region([&] {
|
||||
return _e.read(region(), _container.cleaner(), schema(), _container.tracker(), _container.phase());
|
||||
return _e.read(region(), _container.cleaner(), _container.tracker(), _container.phase());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -194,7 +194,7 @@ void mvcc_partition::apply_to_evictable(partition_entry&& src, schema_ptr src_sc
|
||||
mutation_cleaner src_cleaner(region(), no_cache_tracker, app_stats_for_tests);
|
||||
auto c = as(region(), [&] {
|
||||
if (_s != src_schema) {
|
||||
src.upgrade(src_schema, _s, src_cleaner, no_cache_tracker);
|
||||
src.upgrade(region(), _s, src_cleaner, no_cache_tracker);
|
||||
}
|
||||
return _e.apply_to_incomplete(*schema(), std::move(src), src_cleaner, as, region(),
|
||||
*_container.tracker(), _container.next_phase(), _container.accounter());
|
||||
@@ -221,7 +221,7 @@ mvcc_partition& mvcc_partition::operator+=(const mutation& m) {
|
||||
void mvcc_partition::apply(const mutation_partition& mp, schema_ptr mp_s) {
|
||||
with_allocator(region().allocator(), [&] {
|
||||
if (_evictable) {
|
||||
apply_to_evictable(partition_entry(mutation_partition_v2(*mp_s, mp)), mp_s);
|
||||
apply_to_evictable(partition_entry(*mp_s, mutation_partition_v2(*mp_s, mp)), mp_s);
|
||||
} else {
|
||||
logalloc::allocating_section as;
|
||||
as(region(), [&] {
|
||||
@@ -249,7 +249,7 @@ mvcc_partition mvcc_container::make_not_evictable(const mutation_partition& mp)
|
||||
return with_allocator(region().allocator(), [&] {
|
||||
logalloc::allocating_section as;
|
||||
return as(region(), [&] {
|
||||
return mvcc_partition(_schema, partition_entry(mutation_partition_v2(*_schema, mp)), *this, false);
|
||||
return mvcc_partition(_schema, partition_entry(*_schema, mutation_partition_v2(*_schema, mp)), *this, false);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -304,7 +304,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete) {
|
||||
assert_that(table.schema(), e.squashed()).is_equal_to((m2 + m3).partition());
|
||||
|
||||
// Check that snapshot data is not stolen when its entry is applied
|
||||
auto e2 = ms.make_evictable(mutation_partition(table.schema()));
|
||||
auto e2 = ms.make_evictable(mutation_partition(s));
|
||||
e2 += std::move(e);
|
||||
assert_that(table.schema(), ms.squashed(snap1)).is_equal_to(m1.partition());
|
||||
assert_that(table.schema(), e2.squashed()).is_equal_to((m2 + m3).partition());
|
||||
@@ -381,7 +381,7 @@ SEASTAR_TEST_CASE(test_eviction_with_active_reader) {
|
||||
auto ck1 = table.make_ckey(1);
|
||||
auto ck2 = table.make_ckey(2);
|
||||
|
||||
auto e = ms.make_evictable(mutation_partition(table.schema()));
|
||||
auto e = ms.make_evictable(mutation_partition(s));
|
||||
|
||||
mutation m1(table.schema(), pk);
|
||||
m1.partition().clustered_row(s, ck2);
|
||||
@@ -452,7 +452,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
|
||||
}
|
||||
|
||||
auto expected = mutation_partition(*s, before);
|
||||
expected.apply_weak(*s, std::move(expected_to_apply_slice), app_stats);
|
||||
expected.apply(*s, std::move(expected_to_apply_slice), app_stats);
|
||||
|
||||
e += to_apply;
|
||||
|
||||
@@ -505,7 +505,7 @@ void evict_with_consistency_check(mvcc_container& ms, mvcc_partition& e, const m
|
||||
testlog.trace("evicting");
|
||||
auto ret = ms.tracker()->evict_from_lru_shallow();
|
||||
|
||||
testlog.trace("entry: {}", partition_entry::printer(s, e.entry()));
|
||||
testlog.trace("entry: {}", partition_entry::printer(e.entry()));
|
||||
|
||||
auto p = e.squashed();
|
||||
auto cont = p.get_continuity(s);
|
||||
@@ -553,7 +553,7 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
|
||||
auto snap2 = e.read();
|
||||
e += m3;
|
||||
|
||||
testlog.trace("e: {}", partition_entry::printer(*e.schema(), e.entry()));
|
||||
testlog.trace("e: {}", partition_entry::printer(e.entry()));
|
||||
|
||||
auto expected = e.squashed();
|
||||
auto snap = e.read();
|
||||
@@ -592,14 +592,14 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging_for_nonevictab
|
||||
{
|
||||
mutation_application_stats app_stats;
|
||||
logalloc::reclaim_lock rl(r);
|
||||
auto e = partition_entry(mutation_partition_v2(*s, m3.partition()));
|
||||
auto snap1 = e.read(r, cleaner, s, no_cache_tracker);
|
||||
auto e = partition_entry(*s, mutation_partition_v2(*s, m3.partition()));
|
||||
auto snap1 = e.read(r, cleaner, no_cache_tracker);
|
||||
e.apply(r, cleaner, *s, m2.partition(), *s, app_stats);
|
||||
auto snap2 = e.read(r, cleaner, s, no_cache_tracker);
|
||||
auto snap2 = e.read(r, cleaner, no_cache_tracker);
|
||||
e.apply(r, cleaner, *s, m1.partition(), *s, app_stats);
|
||||
|
||||
auto expected = e.squashed(*s, is_evictable::no);
|
||||
auto snap = e.read(r, cleaner, s, no_cache_tracker);
|
||||
auto snap = e.read(r, cleaner, no_cache_tracker);
|
||||
auto actual = read_using_cursor(*snap);
|
||||
|
||||
BOOST_REQUIRE(expected.is_fully_continuous());
|
||||
@@ -636,7 +636,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_in_evictable) {
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
auto e = partition_entry::make_evictable(*s, m1.partition());
|
||||
auto snap1 = e.read(r, tracker.cleaner(), s, &tracker);
|
||||
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
|
||||
e.add_version(*s, &tracker).partition()
|
||||
.clustered_row(*s, ss.make_ckey(1), is_dummy::no, is_continuous::no);
|
||||
e.add_version(*s, &tracker).partition()
|
||||
@@ -646,7 +646,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_in_evictable) {
|
||||
expected.clustered_row(*s, ss.make_ckey(1), is_dummy::no, is_continuous::no);
|
||||
expected.clustered_row(*s, ss.make_ckey(2), is_dummy::no, is_continuous::no);
|
||||
|
||||
auto snap = e.read(r, tracker.cleaner(), s, &tracker);
|
||||
auto snap = e.read(r, tracker.cleaner(), &tracker);
|
||||
auto actual = read_using_cursor(*snap);
|
||||
auto actual2 = e.squashed(*s, is_evictable::yes);
|
||||
|
||||
@@ -670,8 +670,8 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) {
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(s));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
|
||||
|
||||
{
|
||||
auto&& p1 = snap1->version()->partition();
|
||||
@@ -683,7 +683,7 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) {
|
||||
p1.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
|
||||
|
||||
partition_snapshot_row_cursor cur(s, *snap2);
|
||||
position_in_partition::equal_compare eq(s);
|
||||
@@ -841,8 +841,8 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor_reversed) {
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(s));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
|
||||
|
||||
int ck_0 = 10;
|
||||
int ck_1 = 9;
|
||||
@@ -862,7 +862,7 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor_reversed) {
|
||||
p1.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
|
||||
|
||||
auto rev_s = s.make_reversed();
|
||||
partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true);
|
||||
@@ -1032,9 +1032,9 @@ SEASTAR_TEST_CASE(test_cursor_tracks_continuity_in_reversed_mode) {
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(s));
|
||||
tracker.insert(e);
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
|
||||
|
||||
{
|
||||
auto&& p1 = snap1->version()->partition();
|
||||
@@ -1044,7 +1044,7 @@ SEASTAR_TEST_CASE(test_cursor_tracks_continuity_in_reversed_mode) {
|
||||
p1.clustered_rows_entry(s, table.make_ckey(4), is_dummy::no, is_continuous::no));
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
@@ -1173,7 +1173,7 @@ public:
|
||||
, _tracker(t)
|
||||
, _r(r)
|
||||
, _e(_tracker ? partition_entry::make_evictable(*_schema, mutation_partition::make_incomplete(*_schema))
|
||||
: partition_entry(mutation_partition_v2(_schema)))
|
||||
: partition_entry(*_schema, mutation_partition_v2(*_schema)))
|
||||
{
|
||||
if (_tracker) {
|
||||
_tracker->insert(_e);
|
||||
@@ -1181,7 +1181,7 @@ public:
|
||||
}
|
||||
|
||||
partition_entry_builder& new_version() {
|
||||
_snapshots.emplace_back(_e.read(_r, _cleaner, _schema, _tracker, _snapshots.size()));
|
||||
_snapshots.emplace_back(_e.read(_r, _cleaner, _tracker, _snapshots.size()));
|
||||
_last_key = {};
|
||||
return *this;
|
||||
}
|
||||
@@ -1537,8 +1537,8 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_in_reversed_mode) {
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(s));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
|
||||
|
||||
{
|
||||
auto&& p1 = snap1->version()->partition();
|
||||
@@ -1547,7 +1547,7 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_in_reversed_mode) {
|
||||
p1.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
@@ -1593,8 +1593,8 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_does_not_set_continuity_in_reverse
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(s));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), &tracker);
|
||||
|
||||
{
|
||||
auto&& p1 = snap1->version()->partition();
|
||||
@@ -1604,7 +1604,7 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_does_not_set_continuity_in_reverse
|
||||
p1.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1);
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
@@ -1654,7 +1654,7 @@ SEASTAR_TEST_CASE(test_apply_is_atomic) {
|
||||
while (true) {
|
||||
logalloc::reclaim_lock rl(r);
|
||||
mutation_partition_v2 m2 = mutation_partition_v2(*second.schema(), second.partition());
|
||||
auto e = partition_entry(mutation_partition_v2(*target.schema(), target.partition()));
|
||||
auto e = partition_entry(*target.schema(), mutation_partition_v2(*target.schema(), target.partition()));
|
||||
//auto snap1 = e.read(r, gen.schema());
|
||||
|
||||
alloc.fail_after(fail_offset++);
|
||||
@@ -1703,8 +1703,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
|
||||
m3.partition().make_fully_continuous();
|
||||
|
||||
{
|
||||
auto e = partition_entry(mutation_partition_v2(*s, m1.partition()));
|
||||
auto snap1 = e.read(r, cleaner, s, nullptr);
|
||||
auto e = partition_entry(*s, mutation_partition_v2(*s, m1.partition()));
|
||||
auto snap1 = e.read(r, cleaner, nullptr);
|
||||
|
||||
{
|
||||
mutation_application_stats app_stats;
|
||||
@@ -1712,7 +1712,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
|
||||
e.apply(r, cleaner, *s, m2.partition(), *s, app_stats);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, cleaner, s, nullptr);
|
||||
auto snap2 = e.read(r, cleaner, nullptr);
|
||||
|
||||
snap1 = {};
|
||||
snap2 = {};
|
||||
@@ -1724,8 +1724,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
|
||||
}
|
||||
|
||||
{
|
||||
auto e = partition_entry(mutation_partition_v2(*s, m1.partition()));
|
||||
auto snap1 = e.read(r, cleaner, s, nullptr);
|
||||
auto e = partition_entry(*s, mutation_partition_v2(*s, m1.partition()));
|
||||
auto snap1 = e.read(r, cleaner, nullptr);
|
||||
|
||||
{
|
||||
mutation_application_stats app_stats;
|
||||
@@ -1733,7 +1733,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
|
||||
e.apply(r, cleaner, *s, m2.partition(), *s, app_stats);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, cleaner, s, nullptr);
|
||||
auto snap2 = e.read(r, cleaner, nullptr);
|
||||
|
||||
snap2 = {};
|
||||
snap1 = {};
|
||||
@@ -2005,7 +2005,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) {
|
||||
}
|
||||
});
|
||||
|
||||
testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(*s, e.entry()));
|
||||
testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(e.entry()));
|
||||
|
||||
mutation m2(s, ss.make_pkey());
|
||||
// This one covers the dummy row for before(3) and before(2), marking the range [1, 3] as continuous.
|
||||
@@ -2020,7 +2020,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) {
|
||||
e += m3;
|
||||
auto snp3 = e.read();
|
||||
|
||||
testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(*s, e.entry()));
|
||||
testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(e.entry()));
|
||||
|
||||
auto expected = m0 + m1 + m2 + m3;
|
||||
|
||||
@@ -2033,3 +2033,104 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) {
|
||||
evict_with_consistency_check(ms, e, expected.partition());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_gentle_schema_upgrades) {
|
||||
return seastar::async([] {
|
||||
auto ts1 = api::new_timestamp();
|
||||
auto ts_drop = api::new_timestamp();
|
||||
auto ts2 = api::new_timestamp();
|
||||
|
||||
auto s1 = schema_builder("ks", "cf")
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck", utf8_type, column_kind::clustering_key)
|
||||
.with_column("s1", utf8_type, column_kind::static_column)
|
||||
.with_column("s2", utf8_type, column_kind::static_column)
|
||||
.with_column("v1", utf8_type, column_kind::regular_column)
|
||||
.with_column("v2", utf8_type, column_kind::regular_column)
|
||||
.with_column("v3", utf8_type, column_kind::regular_column)
|
||||
.with_column("v4", utf8_type, column_kind::regular_column)
|
||||
.build();
|
||||
auto s2 = schema_builder(s1)
|
||||
.remove_column("s1")
|
||||
.remove_column("v3")
|
||||
.without_column("v4", ts_drop).with_column("v4", utf8_type)
|
||||
.with_column("v5", utf8_type)
|
||||
.build();
|
||||
|
||||
auto m1 = std::invoke([s1, ts1] {
|
||||
auto x = mutation(s1, partition_key::from_single_value(*s1, serialized(0)));
|
||||
auto ck = clustering_key::from_single_value(*s1, serialized(0));
|
||||
x.set_static_cell("s1", "s1_value", ts1);
|
||||
x.set_static_cell("s2", "s2_value", ts1);
|
||||
x.set_clustered_cell(ck, "v1", "v1_value", ts1);
|
||||
x.set_clustered_cell(ck, "v2", "v2_value", ts1);
|
||||
x.set_clustered_cell(ck, "v3", "v3_value", ts1);
|
||||
x.set_clustered_cell(ck, "v4", "v4_value", ts1);
|
||||
x.partition().set_static_row_continuous(false);
|
||||
x.partition().ensure_last_dummy(*s1);
|
||||
return x;
|
||||
});
|
||||
|
||||
auto m2 = std::invoke([s2, ts2] {
|
||||
auto x = mutation(s2, partition_key::from_single_value(*s2, serialized(0)));
|
||||
auto ck = clustering_key::from_single_value(*s2, serialized(0));
|
||||
x.set_clustered_cell(ck, "v2", "v2_value_new", ts2);
|
||||
x.set_clustered_cell(ck, "v5", "v5_value_new", ts2);
|
||||
x.partition().set_static_row_continuous(false);
|
||||
x.partition().ensure_last_dummy(*s2);
|
||||
return x;
|
||||
});
|
||||
|
||||
auto expected = std::invoke([s2, ts1, ts2] {
|
||||
auto x = mutation(s2, partition_key::from_single_value(*s2, serialized(0)));
|
||||
auto ck = clustering_key::from_single_value(*s2, serialized(0));
|
||||
x.set_static_cell("s2", "s2_value", ts1);
|
||||
x.set_clustered_cell(ck, "v1", "v1_value", ts1);
|
||||
x.set_clustered_cell(ck, "v2", "v2_value_new", ts2);
|
||||
x.set_clustered_cell(ck, "v5", "v5_value_new", ts2);
|
||||
x.partition().set_static_row_continuous(false);
|
||||
x.partition().ensure_last_dummy(*s2);
|
||||
return x;
|
||||
});
|
||||
|
||||
{
|
||||
// Test that the version merge is lazy.
|
||||
// (This is not important and might be changed in the future.
|
||||
// We often run some operations synchronously and only put them
|
||||
// in the background after they preempt for the first time.)
|
||||
mvcc_container ms(s1);
|
||||
auto e = ms.make_evictable(m1.partition());
|
||||
e.upgrade(s2);
|
||||
BOOST_REQUIRE(e.entry().version()->next());
|
||||
// Test that the upgrade initiated the merge.
|
||||
ms.cleaner().drain().get();
|
||||
BOOST_REQUIRE(!e.entry().version()->next());
|
||||
}
|
||||
{
|
||||
// Test that the on-the-fly merge gives the expected result.
|
||||
mvcc_container ms(s1);
|
||||
auto e = ms.make_evictable(m1.partition());
|
||||
auto rd1 = e.read();
|
||||
e.upgrade(s2);
|
||||
auto rd2 = e.read();
|
||||
e += m2;
|
||||
auto rd3 = e.read();
|
||||
|
||||
assert_that(s1, read_using_cursor(*rd1)).is_equal_to(*s1, m1.partition());
|
||||
|
||||
auto rd2_expected = mutation_partition(*s1, m1.partition());
|
||||
rd2_expected.upgrade(*s1, *s2);
|
||||
assert_that(s2, read_using_cursor(*rd2)).is_equal_to(rd2_expected);
|
||||
|
||||
assert_that(s2, read_using_cursor(*rd3)).is_equal_to(*s2, expected.partition());
|
||||
|
||||
rd1 = {};
|
||||
rd2 = {};
|
||||
// Merge versions.
|
||||
ms.cleaner().drain().get();
|
||||
BOOST_REQUIRE(!e.entry().version()->next());
|
||||
// Test that the background merge gives the expected result.
|
||||
assert_that(s2, read_using_cursor(*rd3)).is_equal_to(*s2, expected.partition());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -950,6 +950,7 @@ SEASTAR_TEST_CASE(test_eviction_after_schema_change) {
|
||||
rd.fill_buffer().get();
|
||||
}
|
||||
|
||||
tracker.cleaner().drain().get0();
|
||||
while (tracker.region().evict_some() == memory::reclaiming_result::reclaimed_something) ;
|
||||
|
||||
// The partition should be evictable after schema change
|
||||
@@ -3508,6 +3509,16 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
|
||||
auto m2 = gen();
|
||||
m2.partition().make_fully_continuous();
|
||||
|
||||
bool upgrade_schema = tests::random::get_bool();
|
||||
if (upgrade_schema) {
|
||||
schema_ptr new_schema = schema_builder(s)
|
||||
.with_column(to_bytes("_phantom"), byte_type)
|
||||
.remove_column("_phantom")
|
||||
.build();
|
||||
m2.upgrade(new_schema);
|
||||
cache.set_schema(new_schema);
|
||||
}
|
||||
|
||||
auto mt = make_lw_shared<replica::memtable>(m2.schema());
|
||||
mt->apply(m2);
|
||||
cache.update(row_cache::external_updater([&] () noexcept {
|
||||
|
||||
Reference in New Issue
Block a user