mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge "Materialized views: Fixes to update generation" from Duarte
" Fixes to several issues around view update generation, pertaining to timestamp and TTL management. Fixes #3361 Fixes #3360 Fixes #3140 Refs #3362 Tests: unit(release, debug), dtest(materialized_views.py) " Reviewed-by: Nadav Har'El <nyh@scylladb.com> * 'materialized-views/fixes-galore/v2' of http://github.com/duarten/scylla: mutation_partition: Clarify comment about emptiness tests: Add view_complex_test tests/view_schema_test: Complete test db/view: Move cells instead of copying in add_cells_to_view() db/view: Handle unselected base columns and corner cases mutation_partition: Regular base column in view determines row liveness db/view: Don't avoid read-before-write when view PK matches base db/view: Process base updates to column unselected by its views db/view: Consider partition tombstone when generating updates tests/view_schema_test: Remove unneeded test mutation_fragment: Allow querying if row is live view_info: Add view_column() overload view_info: Explicitly initialize base-dependent fields cql3/alter_table_statement: Forbid dropping columns of MV base tables
This commit is contained in:
@@ -274,6 +274,7 @@ scylla_tests = [
|
||||
'tests/virtual_reader_test',
|
||||
'tests/view_schema_test',
|
||||
'tests/view_build_test',
|
||||
'tests/view_complex_test',
|
||||
'tests/counter_test',
|
||||
'tests/cell_locker_test',
|
||||
'tests/row_locker_test',
|
||||
|
||||
@@ -305,14 +305,10 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
}
|
||||
}
|
||||
|
||||
// If a column is dropped which is included in a view, we don't allow the drop to take place.
|
||||
auto view_names = ::join(", ", cf.views()
|
||||
| boost::adaptors::filtered([&] (auto&& v) { return bool(v->get_column_definition(column_name->name())); })
|
||||
| boost::adaptors::transformed([] (auto&& v) { return v->cf_name(); }));
|
||||
if (!view_names.empty()) {
|
||||
if (!cf.views().empty()) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"Cannot drop column %s, depended on by materialized views (%s.{%s})",
|
||||
column_name, keyspace(), view_names));
|
||||
"Cannot drop column %s on base table %s.%s with materialized views",
|
||||
column_name, keyspace(), column_family()));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -4275,6 +4275,7 @@ static std::vector<view_ptr>::iterator find_view(std::vector<view_ptr>& views, c
|
||||
}
|
||||
|
||||
void column_family::add_or_update_view(view_ptr v) {
|
||||
v->view_info()->initialize_base_dependent_fields(*schema());
|
||||
auto existing = find_view(_views, v);
|
||||
if (existing != _views.end()) {
|
||||
*existing = std::move(v);
|
||||
|
||||
228
db/view/view.cc
228
db/view/view.cc
@@ -111,21 +111,25 @@ const column_definition* view_info::view_column(const schema& base, column_id ba
|
||||
// FIXME: Map base column_ids to view_column_ids, which can be something like
|
||||
// a boost::small_vector where the position is the base column_id, and the
|
||||
// value is either empty or the view's column_id.
|
||||
return _schema.get_column_definition(base.regular_column_at(base_id).name());
|
||||
return view_column(base.regular_column_at(base_id));
|
||||
}
|
||||
|
||||
stdx::optional<column_id> view_info::base_non_pk_column_in_view_pk(const schema& base) const {
|
||||
if (!_base_non_pk_column_in_view_pk) {
|
||||
_base_non_pk_column_in_view_pk.emplace(stdx::nullopt);
|
||||
for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) {
|
||||
auto* base_col = base.get_column_definition(view_col.name());
|
||||
if (!base_col->is_primary_key()) {
|
||||
_base_non_pk_column_in_view_pk.emplace(base_col->id);
|
||||
break;
|
||||
}
|
||||
const column_definition* view_info::view_column(const column_definition& base_def) const {
|
||||
return _schema.get_column_definition(base_def.name());
|
||||
}
|
||||
|
||||
stdx::optional<column_id> view_info::base_non_pk_column_in_view_pk() const {
|
||||
return _base_non_pk_column_in_view_pk;
|
||||
}
|
||||
|
||||
void view_info::initialize_base_dependent_fields(const schema& base) {
|
||||
for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) {
|
||||
auto* base_col = base.get_column_definition(view_col.name());
|
||||
if (!base_col->is_primary_key()) {
|
||||
_base_non_pk_column_in_view_pk.emplace(base_col->id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return *_base_non_pk_column_in_view_pk;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
@@ -147,25 +151,7 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de
|
||||
// - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns:
|
||||
// even if an update don't match a view condition on a regular column, that update can still invalidate a
|
||||
// pre-existing entry) - note that the upper layers should already have checked the partition key;
|
||||
// - the update doesn't modify any of the columns impacting the view (where "impacting" the view means that column
|
||||
// is neither included in the view, nor used by the view filter).
|
||||
if (!clustering_prefix_matches(base, view, key.key(), update.key())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We want to check if the update modifies any of the columns that are part of the view (in which case the view is
|
||||
// affected). But iff the view includes all the base table columns, or the update has either a row deletion or a
|
||||
// row marker, we know the view is affected right away.
|
||||
if (view.include_all_columns() || update.row().deleted_at() || update.row().marker().is_live()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool affected = false;
|
||||
update.row().cells().for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& cell) {
|
||||
affected = view.view_column(base, id);
|
||||
return stop_iteration(affected);
|
||||
});
|
||||
return affected;
|
||||
return clustering_prefix_matches(base, view, key.key(), update.key());
|
||||
}
|
||||
|
||||
static bool update_requires_read_before_write(const schema& base,
|
||||
@@ -174,14 +160,6 @@ static bool update_requires_read_before_write(const schema& base,
|
||||
const rows_entry& update) {
|
||||
for (auto&& v : views) {
|
||||
view_info& vf = *v->view_info();
|
||||
// A view whose primary key contains only the base's primary key columns doesn't require a read-before-write.
|
||||
// However, if the view has restrictions on regular columns, then a write that doesn't match those filters
|
||||
// needs to add a tombstone (assuming a previous update matched those filter and created a view entry); for
|
||||
// now we just do a read-before-write in that case.
|
||||
if (!vf.base_non_pk_column_in_view_pk(base)
|
||||
&& vf.select_statement().get_restrictions()->get_non_pk_restriction().empty()) {
|
||||
continue;
|
||||
}
|
||||
if (may_be_affected_by(base, vf, key, update)) {
|
||||
return true;
|
||||
}
|
||||
@@ -255,12 +233,12 @@ private:
|
||||
row_marker compute_row_marker(const clustering_row& base_row) const;
|
||||
deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update);
|
||||
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now);
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
|
||||
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
|
||||
create_entry(base_key, update, now);
|
||||
delete_old_entry(base_key, existing, row_tombstone(), now);
|
||||
delete_old_entry(base_key, existing, update, now);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -268,12 +246,7 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
/*
|
||||
* We need to compute both the timestamp and expiration.
|
||||
*
|
||||
* For the timestamp, it makes sense to use the bigger timestamp for all view PK columns.
|
||||
*
|
||||
* This is more complex for the expiration. We want to maintain consistency between the base and the view, so the
|
||||
* entry should only exist as long as the base row exists _and_ has non-null values for all the columns that are part
|
||||
* of the view PK.
|
||||
* Which means we really have 2 cases:
|
||||
* There are 3 cases:
|
||||
* 1) There is a column that is not in the base PK but is in the view PK. In that case, as long as that column
|
||||
* lives, the view entry does too, but as soon as it expires (or is deleted for that matter) the entry also
|
||||
* should expire. So the expiration for the view is the one of that column, regardless of any other expiration.
|
||||
@@ -284,38 +257,53 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
* UPDATE t SET c = 0 WHERE a = 0 AND b = 0;
|
||||
* then even after 3 seconds elapsed, the row will still exist (it just won't have a "row marker" anymore) and so
|
||||
* the MV should still have a corresponding entry.
|
||||
* 2) The columns for the base and view PKs are exactly the same. In that case, the view entry should live
|
||||
* as long as the base row lives. This means the view entry should only expire once *everything* in the
|
||||
* base row has expired. So, the row TTL should be the max of any other TTL. This is particularly important
|
||||
* in the case where the base row has a TTL, but a column *absent* from the view holds a greater TTL.
|
||||
* This cell determines the liveness of the view row.
|
||||
* 2) The columns for the base and view PKs are exactly the same, and all base columns are selected by the view.
|
||||
* In that case, all components (marker, deletion and cells) are the same and trivially mapped.
|
||||
* 3) The columns for the base and view PKs are exactly the same, but some base columns are not selected in the view.
|
||||
* Use the max timestamp out of the base row marker and all the unselected columns - this ensures we can keep the
|
||||
* view row alive. Do the same thing for the expiration, if the marker is dead or will expire, and so
|
||||
* will all unselected columns.
|
||||
*/
|
||||
|
||||
auto marker = base_row.marker();
|
||||
auto col_id = _view_info.base_non_pk_column_in_view_pk(*_base);
|
||||
auto col_id = _view_info.base_non_pk_column_in_view_pk();
|
||||
if (col_id) {
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto cell = base_row.cells().cell_at(*col_id).as_atomic_cell();
|
||||
auto timestamp = std::max(marker.timestamp(), cell.timestamp());
|
||||
return cell.is_live_and_has_ttl() ? row_marker(timestamp, cell.ttl(), cell.expiry()) : row_marker(timestamp);
|
||||
return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp());
|
||||
}
|
||||
|
||||
if (!marker.is_expiring()) {
|
||||
if (_view_info.include_all_columns()) {
|
||||
return marker;
|
||||
}
|
||||
|
||||
auto ttl = marker.ttl();
|
||||
auto expiry = marker.expiry();
|
||||
auto timestamp = marker.timestamp();
|
||||
bool has_non_expiring_live_cell = false;
|
||||
expiry_opt biggest_expiry;
|
||||
gc_clock::duration ttl = gc_clock::duration::min();
|
||||
if (marker.is_expiring()) {
|
||||
biggest_expiry = marker.expiry();
|
||||
ttl = marker.ttl();
|
||||
}
|
||||
auto maybe_update_expiry_and_ttl = [&] (atomic_cell_view&& cell) {
|
||||
// Note: Cassandra compares cell.ttl() here, but that seems very wrong.
|
||||
// See CASSANDRA-13127.
|
||||
if (cell.is_live_and_has_ttl() && cell.expiry() > expiry) {
|
||||
expiry = cell.expiry();
|
||||
ttl = cell.ttl();
|
||||
timestamp = std::max(timestamp, cell.timestamp());
|
||||
if (cell.is_live_and_has_ttl()) {
|
||||
if (cell.expiry() >= biggest_expiry.value_or(cell.expiry())) {
|
||||
biggest_expiry = cell.expiry();
|
||||
ttl = cell.ttl();
|
||||
}
|
||||
} else if (cell.is_live()) {
|
||||
has_non_expiring_live_cell = true;
|
||||
}
|
||||
};
|
||||
|
||||
// Iterate over regular cells not in the view, as we already have the timestamps of the included columns.
|
||||
base_row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
|
||||
auto& def = _base->regular_column_at(id);
|
||||
if (_view_info.view_column(def)) {
|
||||
return;
|
||||
}
|
||||
if (def.is_atomic()) {
|
||||
maybe_update_expiry_and_ttl(c.as_atomic_cell());
|
||||
} else {
|
||||
@@ -323,7 +311,13 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
}
|
||||
});
|
||||
|
||||
return row_marker(marker.timestamp(), ttl, expiry);
|
||||
if ((marker.is_live() && !marker.is_expiring()) || has_non_expiring_live_cell) {
|
||||
return row_marker(timestamp);
|
||||
}
|
||||
if (biggest_expiry) {
|
||||
return row_marker(timestamp, ttl, *biggest_expiry);
|
||||
}
|
||||
return marker;
|
||||
}
|
||||
|
||||
deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) {
|
||||
@@ -355,11 +349,11 @@ static const column_definition* view_column(const schema& base, const schema& vi
|
||||
return view.get_column_definition(base.regular_column_at(base_id).name());
|
||||
}
|
||||
|
||||
static void add_cells_to_view(const schema& base, const schema& view, const row& base_cells, row& view_cells) {
|
||||
base_cells.for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
|
||||
static void add_cells_to_view(const schema& base, const schema& view, row base_cells, row& view_cells) {
|
||||
base_cells.for_each_cell([&] (column_id id, atomic_cell_or_collection& c) {
|
||||
auto* view_col = view_column(base, view, id);
|
||||
if (view_col && !view_col->is_primary_key()) {
|
||||
view_cells.append_cell(view_col->id, c);
|
||||
view_cells.append_cell(view_col->id, std::move(c));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -373,7 +367,8 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
|
||||
return;
|
||||
}
|
||||
deletable_row& r = get_view_row(base_key, update);
|
||||
r.apply(compute_row_marker(update));
|
||||
auto marker = compute_row_marker(update);
|
||||
r.apply(marker);
|
||||
r.apply(update.tomb());
|
||||
add_cells_to_view(*_base, *_view, update.cells(), r.cells());
|
||||
}
|
||||
@@ -382,41 +377,51 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
|
||||
* Deletes the view entry corresponding to the provided base row.
|
||||
* This method checks that the base row does match the view filter before bothering.
|
||||
*/
|
||||
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) {
|
||||
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
|
||||
// Before deleting an old entry, make sure it was matching the view filter
|
||||
// (otherwise there is nothing to delete)
|
||||
if (!is_partition_key_empty(*_base, *_view, base_key, existing) && matches_view_filter(*_base, _view_info, base_key, existing, now)) {
|
||||
do_delete_old_entry(base_key, existing, t, now);
|
||||
do_delete_old_entry(base_key, existing, update, now);
|
||||
}
|
||||
}
|
||||
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) {
|
||||
if (t) {
|
||||
get_view_row(base_key, existing).apply(t);
|
||||
return;
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
|
||||
auto& r = get_view_row(base_key, existing);
|
||||
auto col_id = _view_info.base_non_pk_column_in_view_pk();
|
||||
if (col_id) {
|
||||
// We delete the old row using a shadowable row tombstone, making sure that
|
||||
// the tombstone deletes everything in the row (or it might still show up).
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto cell = existing.cells().cell_at(*col_id).as_atomic_cell();
|
||||
if (cell.is_live()) {
|
||||
r.apply(shadowable_tombstone(cell.timestamp(), now));
|
||||
}
|
||||
} else {
|
||||
auto ts = existing.marker().timestamp();
|
||||
auto set_max_ts = [&ts] (atomic_cell_view&& cell) {
|
||||
ts = std::max(ts, cell.timestamp());
|
||||
};
|
||||
if (!_view_info.include_all_columns()) {
|
||||
existing.cells().for_each_cell([&, this] (column_id id, const atomic_cell_or_collection& cell) {
|
||||
auto& def = _base->regular_column_at(id);
|
||||
if (_view_info.view_column(def)) {
|
||||
return;
|
||||
}
|
||||
// Unselected columns are used regardless of being live or dead, since we don't know if
|
||||
// they were used to compute the view entry's row marker.
|
||||
if (def.is_atomic()) {
|
||||
set_max_ts(cell.as_atomic_cell());
|
||||
} else {
|
||||
collection_type_impl::for_each_cell(cell.as_collection_mutation(), set_max_ts);
|
||||
}
|
||||
});
|
||||
}
|
||||
auto marker = row_marker(tombstone(ts, now));
|
||||
r.apply(marker);
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, std::move(diff), r.cells());
|
||||
}
|
||||
// We delete the old row using a shadowable row tombstone, making sure that
|
||||
// the tombstone deletes everything in the row (or it might still show up).
|
||||
// FIXME: If the entry is "resurrected" by a later update, we would need to
|
||||
// ensure that the timestamp for the entry then is bigger than the tombstone
|
||||
// we're just inserting, which is not currently guaranteed. See CASSANDRA-11500
|
||||
// for details.
|
||||
auto ts = existing.marker().timestamp();
|
||||
auto set_max_ts = [&ts] (atomic_cell_view&& cell) {
|
||||
ts = std::max(ts, cell.timestamp());
|
||||
};
|
||||
existing.cells().for_each_cell([&, this] (column_id id, const atomic_cell_or_collection& cell) {
|
||||
auto* def = _view_info.view_column(*_base, id);
|
||||
if (!def) {
|
||||
return;
|
||||
}
|
||||
if (def->is_atomic()) {
|
||||
set_max_ts(cell.as_atomic_cell());
|
||||
} else {
|
||||
collection_type_impl::for_each_cell(cell.as_collection_mutation(), set_max_ts);
|
||||
}
|
||||
});
|
||||
get_view_row(base_key, existing).apply(shadowable_tombstone(ts, now));
|
||||
r.apply(update.tomb());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -435,16 +440,17 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
|
||||
return;
|
||||
}
|
||||
if (is_partition_key_empty(*_base, *_view, base_key, update) || !matches_view_filter(*_base, _view_info, base_key, update, now)) {
|
||||
do_delete_old_entry(base_key, existing, row_tombstone(), now);
|
||||
do_delete_old_entry(base_key, existing, update, now);
|
||||
return;
|
||||
}
|
||||
|
||||
deletable_row& r = get_view_row(base_key, update);
|
||||
r.apply(compute_row_marker(update));
|
||||
auto marker = compute_row_marker(update);
|
||||
r.apply(marker);
|
||||
r.apply(update.tomb());
|
||||
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, diff, r.cells());
|
||||
add_cells_to_view(*_base, *_view, std::move(diff), r.cells());
|
||||
}
|
||||
|
||||
void view_updates::generate_update(
|
||||
@@ -463,16 +469,16 @@ void view_updates::generate_update(
|
||||
return;
|
||||
}
|
||||
|
||||
auto col_id = _view_info.base_non_pk_column_in_view_pk(*_base);
|
||||
auto col_id = _view_info.base_non_pk_column_in_view_pk();
|
||||
if (!col_id) {
|
||||
// The view key is necessarily the same pre and post update.
|
||||
if (existing && !existing->empty()) {
|
||||
if (update.empty()) {
|
||||
delete_old_entry(base_key, *existing, update.tomb(), now);
|
||||
} else {
|
||||
if (existing && existing->is_live(*_base)) {
|
||||
if (update.is_live(*_base)) {
|
||||
update_entry(base_key, update, *existing, now);
|
||||
} else {
|
||||
delete_old_entry(base_key, *existing, update, now);
|
||||
}
|
||||
} else if (!update.empty()) {
|
||||
} else if (update.is_live(*_base)) {
|
||||
create_entry(base_key, update, now);
|
||||
}
|
||||
return;
|
||||
@@ -491,7 +497,7 @@ void view_updates::generate_update(
|
||||
replace_entry(base_key, update, *existing, now);
|
||||
}
|
||||
} else {
|
||||
delete_old_entry(base_key, *existing, update.tomb(), now);
|
||||
delete_old_entry(base_key, *existing, update, now);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -599,13 +605,13 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona
|
||||
|
||||
// We allow existing to be disengaged, which we treat the same as an empty row.
|
||||
if (existing) {
|
||||
existing->marker().compact_and_expire(tombstone(), _now, always_gc, gc_before);
|
||||
existing->cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before);
|
||||
existing->marker().compact_and_expire(existing->tomb().tomb(), _now, always_gc, gc_before);
|
||||
existing->cells().compact_and_expire(*_schema, column_kind::regular_column, existing->tomb(), _now, always_gc, gc_before, existing->marker());
|
||||
update.apply(*_schema, *existing);
|
||||
}
|
||||
|
||||
update.marker().compact_and_expire(tombstone(), _now, always_gc, gc_before);
|
||||
update.cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before);
|
||||
update.marker().compact_and_expire(update.tomb().tomb(), _now, always_gc, gc_before);
|
||||
update.cells().compact_and_expire(*_schema, column_kind::regular_column, update.tomb(), _now, always_gc, gc_before, update.marker());
|
||||
|
||||
for (auto&& v : _view_updates) {
|
||||
v.generate_update(_key, update, existing, _now);
|
||||
@@ -613,9 +619,7 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona
|
||||
}
|
||||
|
||||
static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clustering_row& row) {
|
||||
for (auto&& rt : tracker.range_tombstones_for_row(row.key())) {
|
||||
row.apply(rt.tomb);
|
||||
}
|
||||
row.apply(tracker.tombstone_for_row(row.key()));
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::on_results() {
|
||||
|
||||
@@ -215,7 +215,7 @@ public:
|
||||
}
|
||||
t.apply(current_tombstone);
|
||||
bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before);
|
||||
is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before);
|
||||
is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before, cr.marker());
|
||||
if (only_live() && is_live) {
|
||||
partition_is_not_empty(consumer);
|
||||
auto stop = consumer.consume(std::move(cr), t, true);
|
||||
|
||||
@@ -75,6 +75,11 @@ public:
|
||||
return !_t && _marker.is_missing() && _cells.empty();
|
||||
}
|
||||
|
||||
bool is_live(const schema& s, tombstone base_tombstone = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const {
|
||||
base_tombstone.apply(_t.tomb());
|
||||
return _marker.is_live(base_tombstone, now) || _cells.is_live(s, column_kind::regular_column, base_tombstone, now);
|
||||
}
|
||||
|
||||
void apply(const schema& s, clustering_row&& cr) {
|
||||
_marker.apply(std::move(cr._marker));
|
||||
_t.apply(cr._t, _marker);
|
||||
@@ -138,6 +143,10 @@ public:
|
||||
return _cells.empty();
|
||||
}
|
||||
|
||||
bool is_live(const schema& s, gc_clock::time_point now = gc_clock::time_point::min()) const {
|
||||
return _cells.is_live(s, column_kind::static_column, tombstone(), now);
|
||||
}
|
||||
|
||||
void apply(const schema& s, const row& r) {
|
||||
_cells.apply(s, column_kind::static_column, r);
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include "intrusive_set_external_comparator.hh"
|
||||
#include "counters.hh"
|
||||
#include "row_cache.hh"
|
||||
#include "view_info.hh"
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
|
||||
template<bool reversed>
|
||||
@@ -1264,8 +1265,8 @@ uint32_t mutation_partition::do_compact(const schema& s,
|
||||
deletable_row& row = e.row();
|
||||
row_tombstone tomb = tombstone_for_row(s, e);
|
||||
|
||||
bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before);
|
||||
is_live |= row.marker().compact_and_expire(tomb.tomb(), query_time, can_gc, gc_before);
|
||||
bool is_live = row.marker().compact_and_expire(tomb.tomb(), query_time, can_gc, gc_before);
|
||||
is_live |= row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before, row.marker());
|
||||
|
||||
if (should_purge_row_tombstone(row.deleted_at())) {
|
||||
row.remove_tombstone();
|
||||
@@ -1319,7 +1320,7 @@ void mutation_partition::compact_for_compaction(const schema& s,
|
||||
do_compact(s, compaction_time, all_rows, false, query::max_rows, can_gc);
|
||||
}
|
||||
|
||||
// Returns true if there is no live data or tombstones.
|
||||
// Returns true if the mutation_partition represents no writes.
|
||||
bool mutation_partition::empty() const
|
||||
{
|
||||
if (_tombstone.timestamp != api::missing_timestamp) {
|
||||
@@ -1335,8 +1336,12 @@ deletable_row::is_live(const schema& s, tombstone base_tombstone, gc_clock::time
|
||||
// row is live. Otherwise, a row is considered live if it has any cell
|
||||
// which is live.
|
||||
base_tombstone.apply(_deleted_at.tomb());
|
||||
return _marker.is_live(base_tombstone, query_time)
|
||||
|| has_any_live_data(s, column_kind::regular_column, _cells, base_tombstone, query_time);
|
||||
return _marker.is_live(base_tombstone, query_time) || _cells.is_live(s, column_kind::regular_column, base_tombstone, query_time);
|
||||
}
|
||||
|
||||
bool
|
||||
row::is_live(const schema& s, column_kind kind, tombstone base_tombstone, gc_clock::time_point query_time) const {
|
||||
return has_any_live_data(s, kind, *this, base_tombstone, query_time);
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -1548,9 +1553,30 @@ void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
|
||||
});
|
||||
}
|
||||
|
||||
bool row::compact_and_expire(const schema& s, column_kind kind, row_tombstone tomb, gc_clock::time_point query_time,
|
||||
can_gc_fn& can_gc, gc_clock::time_point gc_before)
|
||||
// When views contain a primary key column that is not part of the base table primary key,
|
||||
// that column determines whether the row is live or not. We need to ensure that when that
|
||||
// cell is dead, and thus the derived row marker, either by normal deletion of by TTL, so
|
||||
// is the rest of the row. To ensure that none of the regular columns keep the row alive,
|
||||
// we erase the live cells according to the shadowable_tombstone rules.
|
||||
static bool dead_marker_shadows_row(const schema& s, column_kind kind, const row_marker& marker) {
|
||||
return s.is_view()
|
||||
&& s.view_info()->base_non_pk_column_in_view_pk()
|
||||
&& !marker.is_live()
|
||||
&& kind == column_kind::regular_column; // not applicable to static rows
|
||||
}
|
||||
|
||||
bool row::compact_and_expire(
|
||||
const schema& s,
|
||||
column_kind kind,
|
||||
row_tombstone tomb,
|
||||
gc_clock::time_point query_time,
|
||||
can_gc_fn& can_gc,
|
||||
gc_clock::time_point gc_before,
|
||||
const row_marker& marker)
|
||||
{
|
||||
if (dead_marker_shadows_row(s, kind, marker)) {
|
||||
tomb.apply(shadowable_tombstone(api::max_timestamp, gc_clock::time_point::max()), row_marker());
|
||||
}
|
||||
bool any_live = false;
|
||||
remove_if([&] (column_id id, atomic_cell_or_collection& c) {
|
||||
bool erase = false;
|
||||
@@ -1592,6 +1618,17 @@ bool row::compact_and_expire(const schema& s, column_kind kind, row_tombstone to
|
||||
return any_live;
|
||||
}
|
||||
|
||||
bool row::compact_and_expire(
|
||||
const schema& s,
|
||||
column_kind kind,
|
||||
row_tombstone tomb,
|
||||
gc_clock::time_point query_time,
|
||||
can_gc_fn& can_gc,
|
||||
gc_clock::time_point gc_before) {
|
||||
row_marker m;
|
||||
return compact_and_expire(s, kind, tomb, query_time, can_gc, gc_before, m);
|
||||
}
|
||||
|
||||
deletable_row deletable_row::difference(const schema& s, column_kind kind, const deletable_row& other) const
|
||||
{
|
||||
deletable_row dr;
|
||||
|
||||
@@ -314,8 +314,22 @@ public:
|
||||
// Expires cells based on query_time. Expires tombstones based on gc_before
|
||||
// and max_purgeable. Removes cells covered by tomb.
|
||||
// Returns true iff there are any live cells left.
|
||||
bool compact_and_expire(const schema& s, column_kind kind, row_tombstone tomb, gc_clock::time_point query_time,
|
||||
can_gc_fn&, gc_clock::time_point gc_before);
|
||||
bool compact_and_expire(
|
||||
const schema& s,
|
||||
column_kind kind,
|
||||
row_tombstone tomb,
|
||||
gc_clock::time_point query_time,
|
||||
can_gc_fn&,
|
||||
gc_clock::time_point gc_before,
|
||||
const row_marker& marker);
|
||||
|
||||
bool compact_and_expire(
|
||||
const schema& s,
|
||||
column_kind kind,
|
||||
row_tombstone tomb,
|
||||
gc_clock::time_point query_time,
|
||||
can_gc_fn&,
|
||||
gc_clock::time_point gc_before);
|
||||
|
||||
row difference(const schema&, column_kind, const row& other) const;
|
||||
|
||||
@@ -327,6 +341,8 @@ public:
|
||||
|
||||
void prepare_hash(const schema& s, column_kind kind) const;
|
||||
|
||||
bool is_live(const schema&, column_kind kind, tombstone tomb = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const row& r);
|
||||
};
|
||||
|
||||
@@ -1030,7 +1046,7 @@ public:
|
||||
// Range tombstones will be trimmed to the boundaries of the clustering ranges.
|
||||
mutation_partition sliced(const schema& s, const query::clustering_row_ranges&) const;
|
||||
|
||||
// Returns true if there is no live data or tombstones.
|
||||
// Returns true if the mutation_partition represents no writes.
|
||||
bool empty() const;
|
||||
public:
|
||||
deletable_row& clustered_row(const schema& s, const clustering_key& key);
|
||||
|
||||
1
test.py
1
test.py
@@ -90,6 +90,7 @@ boost_tests = [
|
||||
'cell_locker_test',
|
||||
'view_schema_test',
|
||||
'view_build_test',
|
||||
'view_complex_test',
|
||||
'clustering_ranges_walker_test',
|
||||
'vint_serialization_test',
|
||||
'duration_test',
|
||||
|
||||
1193
tests/view_complex_test.cc
Normal file
1193
tests/view_complex_test.cc
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1292,27 +1292,6 @@ SEASTAR_TEST_CASE(test_update) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_ignore_update) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c));").get();
|
||||
e.execute_cql("create materialized view mv as select p, c, v1 from cf "
|
||||
"where p is not null and c is not null and v1 is not null primary key (c, p)").get();
|
||||
|
||||
e.execute_cql("insert into cf (p, c, v2) values (0, 0, 0)").get();
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select * from mv").get0();
|
||||
assert_that(msg).is_rows()
|
||||
.with_size(1);
|
||||
});
|
||||
e.execute_cql("update cf set v2 = 3 where p = 1 and c = 1").get();
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select * from mv").get0();
|
||||
assert_that(msg).is_rows()
|
||||
.with_size(1);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_ttl) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("create table cf (p int, c int, v1 int, v2 int, v3 int, primary key (p, c));").get();
|
||||
@@ -2935,6 +2914,30 @@ void complex_timestamp_with_base_pk_columns_in_view_pk_deletion_test(cql_test_en
|
||||
assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(2)}, {int32_type->decompose(1)}, {}, {} }});
|
||||
});
|
||||
|
||||
// Reset values TS=10
|
||||
e.execute_cql("insert into cf (p, c, v1, v2) values (1, 2, 3, 4) using timestamp 10").get();
|
||||
maybe_flush();
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf where p = 1 and c = 2").get0();
|
||||
assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(3)}, {int32_type->decompose(4)}, {long_type->decompose(10L)} }});
|
||||
});
|
||||
|
||||
// Update values TS=20
|
||||
e.execute_cql("update cf using timestamp 20 set v2 = 5 where p = 1 and c = 2").get();
|
||||
maybe_flush();
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf where p = 1 and c = 2").get0();
|
||||
assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(3)}, {int32_type->decompose(5)}, {long_type->decompose(20L)} }});
|
||||
});
|
||||
|
||||
// Delete row TS=10
|
||||
e.execute_cql("delete from cf using timestamp 10 where p = 1 and c = 2").get();
|
||||
maybe_flush();
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf").get0();
|
||||
assert_that(msg).is_rows().with_rows({{ { }, {int32_type->decompose(5)}, {long_type->decompose(20L)} }});
|
||||
});
|
||||
|
||||
e.execute_cql("drop materialized view vcf").get();
|
||||
e.execute_cql("drop table cf").get();
|
||||
}
|
||||
|
||||
@@ -33,8 +33,8 @@ class view_info final {
|
||||
mutable shared_ptr<cql3::statements::select_statement> _select_statement;
|
||||
mutable stdx::optional<query::partition_slice> _partition_slice;
|
||||
mutable stdx::optional<dht::partition_range_vector> _partition_ranges;
|
||||
// Lazily initializes the column id of a regular base table included in the view's PK, if any.
|
||||
mutable stdx::optional<stdx::optional<column_id>> _base_non_pk_column_in_view_pk;
|
||||
// Id of a regular base table column included in the view's PK, if any.
|
||||
mutable stdx::optional<column_id> _base_non_pk_column_in_view_pk;
|
||||
public:
|
||||
view_info(const schema& schema, const raw_view_info& raw_view_info);
|
||||
|
||||
@@ -62,7 +62,9 @@ public:
|
||||
const query::partition_slice& partition_slice() const;
|
||||
const dht::partition_range_vector& partition_ranges() const;
|
||||
const column_definition* view_column(const schema& base, column_id base_id) const;
|
||||
stdx::optional<column_id> base_non_pk_column_in_view_pk(const schema& base) const;
|
||||
const column_definition* view_column(const column_definition& base_def) const;
|
||||
stdx::optional<column_id> base_non_pk_column_in_view_pk() const;
|
||||
void initialize_base_dependent_fields(const schema& base);
|
||||
|
||||
friend bool operator==(const view_info& x, const view_info& y) {
|
||||
return x._raw == y._raw;
|
||||
|
||||
Reference in New Issue
Block a user