diff --git a/configure.py b/configure.py index 47c8f227f0..31b4bd54e0 100755 --- a/configure.py +++ b/configure.py @@ -274,6 +274,7 @@ scylla_tests = [ 'tests/virtual_reader_test', 'tests/view_schema_test', 'tests/view_build_test', + 'tests/view_complex_test', 'tests/counter_test', 'tests/cell_locker_test', 'tests/row_locker_test', diff --git a/cql3/statements/alter_table_statement.cc b/cql3/statements/alter_table_statement.cc index 34d35d7247..f1e771e4d4 100644 --- a/cql3/statements/alter_table_statement.cc +++ b/cql3/statements/alter_table_statement.cc @@ -305,14 +305,10 @@ future> alter_table_statement::a } } - // If a column is dropped which is included in a view, we don't allow the drop to take place. - auto view_names = ::join(", ", cf.views() - | boost::adaptors::filtered([&] (auto&& v) { return bool(v->get_column_definition(column_name->name())); }) - | boost::adaptors::transformed([] (auto&& v) { return v->cf_name(); })); - if (!view_names.empty()) { + if (!cf.views().empty()) { throw exceptions::invalid_request_exception(sprint( - "Cannot drop column %s, depended on by materialized views (%s.{%s})", - column_name, keyspace(), view_names)); + "Cannot drop column %s on base table %s.%s with materialized views", + column_name, keyspace(), column_family())); } break; } diff --git a/database.cc b/database.cc index c4e7b602bb..e348931672 100644 --- a/database.cc +++ b/database.cc @@ -4275,6 +4275,7 @@ static std::vector::iterator find_view(std::vector& views, c } void column_family::add_or_update_view(view_ptr v) { + v->view_info()->initialize_base_dependent_fields(*schema()); auto existing = find_view(_views, v); if (existing != _views.end()) { *existing = std::move(v); diff --git a/db/view/view.cc b/db/view/view.cc index 516e7cb9e0..bf169de1aa 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -111,21 +111,25 @@ const column_definition* view_info::view_column(const schema& base, column_id ba // FIXME: Map base column_ids to view_column_ids, which can be something like // a boost::small_vector where the position is the base column_id, and the // value is either empty or the view's column_id. - return _schema.get_column_definition(base.regular_column_at(base_id).name()); + return view_column(base.regular_column_at(base_id)); } -stdx::optional view_info::base_non_pk_column_in_view_pk(const schema& base) const { - if (!_base_non_pk_column_in_view_pk) { - _base_non_pk_column_in_view_pk.emplace(stdx::nullopt); - for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) { - auto* base_col = base.get_column_definition(view_col.name()); - if (!base_col->is_primary_key()) { - _base_non_pk_column_in_view_pk.emplace(base_col->id); - break; - } +const column_definition* view_info::view_column(const column_definition& base_def) const { + return _schema.get_column_definition(base_def.name()); +} + +stdx::optional view_info::base_non_pk_column_in_view_pk() const { + return _base_non_pk_column_in_view_pk; +} + +void view_info::initialize_base_dependent_fields(const schema& base) { + for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) { + auto* base_col = base.get_column_definition(view_col.name()); + if (!base_col->is_primary_key()) { + _base_non_pk_column_in_view_pk.emplace(base_col->id); + break; } } - return *_base_non_pk_column_in_view_pk; } namespace db { @@ -147,25 +151,7 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de // - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns: // even if an update don't match a view condition on a regular column, that update can still invalidate a // pre-existing entry) - note that the upper layers should already have checked the partition key; - // - the update doesn't modify any of the columns impacting the view (where "impacting" the view means that column - // is neither included in the view, nor used by the view filter). - if (!clustering_prefix_matches(base, view, key.key(), update.key())) { - return false; - } - - // We want to check if the update modifies any of the columns that are part of the view (in which case the view is - // affected). But iff the view includes all the base table columns, or the update has either a row deletion or a - // row marker, we know the view is affected right away. - if (view.include_all_columns() || update.row().deleted_at() || update.row().marker().is_live()) { - return true; - } - - bool affected = false; - update.row().cells().for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& cell) { - affected = view.view_column(base, id); - return stop_iteration(affected); - }); - return affected; + return clustering_prefix_matches(base, view, key.key(), update.key()); } static bool update_requires_read_before_write(const schema& base, @@ -174,14 +160,6 @@ static bool update_requires_read_before_write(const schema& base, const rows_entry& update) { for (auto&& v : views) { view_info& vf = *v->view_info(); - // A view whose primary key contains only the base's primary key columns doesn't require a read-before-write. - // However, if the view has restrictions on regular columns, then a write that doesn't match those filters - // needs to add a tombstone (assuming a previous update matched those filter and created a view entry); for - // now we just do a read-before-write in that case. - if (!vf.base_non_pk_column_in_view_pk(base) - && vf.select_statement().get_restrictions()->get_non_pk_restriction().empty()) { - continue; - } if (may_be_affected_by(base, vf, key, update)) { return true; } @@ -255,12 +233,12 @@ private: row_marker compute_row_marker(const clustering_row& base_row) const; deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update); void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now); - void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now); - void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now); + void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); + void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now); void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) { create_entry(base_key, update, now); - delete_old_entry(base_key, existing, row_tombstone(), now); + delete_old_entry(base_key, existing, update, now); } }; @@ -268,12 +246,7 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons /* * We need to compute both the timestamp and expiration. * - * For the timestamp, it makes sense to use the bigger timestamp for all view PK columns. - * - * This is more complex for the expiration. We want to maintain consistency between the base and the view, so the - * entry should only exist as long as the base row exists _and_ has non-null values for all the columns that are part - * of the view PK. - * Which means we really have 2 cases: + * There are 3 cases: * 1) There is a column that is not in the base PK but is in the view PK. In that case, as long as that column * lives, the view entry does too, but as soon as it expires (or is deleted for that matter) the entry also * should expire. So the expiration for the view is the one of that column, regardless of any other expiration. @@ -284,38 +257,53 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons * UPDATE t SET c = 0 WHERE a = 0 AND b = 0; * then even after 3 seconds elapsed, the row will still exist (it just won't have a "row marker" anymore) and so * the MV should still have a corresponding entry. - * 2) The columns for the base and view PKs are exactly the same. In that case, the view entry should live - * as long as the base row lives. This means the view entry should only expire once *everything* in the - * base row has expired. So, the row TTL should be the max of any other TTL. This is particularly important - * in the case where the base row has a TTL, but a column *absent* from the view holds a greater TTL. + * This cell determines the liveness of the view row. + * 2) The columns for the base and view PKs are exactly the same, and all base columns are selected by the view. + * In that case, all components (marker, deletion and cells) are the same and trivially mapped. + * 3) The columns for the base and view PKs are exactly the same, but some base columns are not selected in the view. + * Use the max timestamp out of the base row marker and all the unselected columns - this ensures we can keep the + * view row alive. Do the same thing for the expiration, if the marker is dead or will expire, and so + * will all unselected columns. */ auto marker = base_row.marker(); - auto col_id = _view_info.base_non_pk_column_in_view_pk(*_base); + auto col_id = _view_info.base_non_pk_column_in_view_pk(); if (col_id) { // Note: multi-cell columns can't be part of the primary key. auto cell = base_row.cells().cell_at(*col_id).as_atomic_cell(); - auto timestamp = std::max(marker.timestamp(), cell.timestamp()); - return cell.is_live_and_has_ttl() ? row_marker(timestamp, cell.ttl(), cell.expiry()) : row_marker(timestamp); + return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp()); } - if (!marker.is_expiring()) { + if (_view_info.include_all_columns()) { return marker; } - auto ttl = marker.ttl(); - auto expiry = marker.expiry(); + auto timestamp = marker.timestamp(); + bool has_non_expiring_live_cell = false; + expiry_opt biggest_expiry; + gc_clock::duration ttl = gc_clock::duration::min(); + if (marker.is_expiring()) { + biggest_expiry = marker.expiry(); + ttl = marker.ttl(); + } auto maybe_update_expiry_and_ttl = [&] (atomic_cell_view&& cell) { - // Note: Cassandra compares cell.ttl() here, but that seems very wrong. - // See CASSANDRA-13127. - if (cell.is_live_and_has_ttl() && cell.expiry() > expiry) { - expiry = cell.expiry(); - ttl = cell.ttl(); + timestamp = std::max(timestamp, cell.timestamp()); + if (cell.is_live_and_has_ttl()) { + if (cell.expiry() >= biggest_expiry.value_or(cell.expiry())) { + biggest_expiry = cell.expiry(); + ttl = cell.ttl(); + } + } else if (cell.is_live()) { + has_non_expiring_live_cell = true; } }; + // Iterate over regular cells not in the view, as we already have the timestamps of the included columns. base_row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) { auto& def = _base->regular_column_at(id); + if (_view_info.view_column(def)) { + return; + } if (def.is_atomic()) { maybe_update_expiry_and_ttl(c.as_atomic_cell()); } else { @@ -323,7 +311,13 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons } }); - return row_marker(marker.timestamp(), ttl, expiry); + if ((marker.is_live() && !marker.is_expiring()) || has_non_expiring_live_cell) { + return row_marker(timestamp); + } + if (biggest_expiry) { + return row_marker(timestamp, ttl, *biggest_expiry); + } + return marker; } deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) { @@ -355,11 +349,11 @@ static const column_definition* view_column(const schema& base, const schema& vi return view.get_column_definition(base.regular_column_at(base_id).name()); } -static void add_cells_to_view(const schema& base, const schema& view, const row& base_cells, row& view_cells) { - base_cells.for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) { +static void add_cells_to_view(const schema& base, const schema& view, row base_cells, row& view_cells) { + base_cells.for_each_cell([&] (column_id id, atomic_cell_or_collection& c) { auto* view_col = view_column(base, view, id); if (view_col && !view_col->is_primary_key()) { - view_cells.append_cell(view_col->id, c); + view_cells.append_cell(view_col->id, std::move(c)); } }); } @@ -373,7 +367,8 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_ return; } deletable_row& r = get_view_row(base_key, update); - r.apply(compute_row_marker(update)); + auto marker = compute_row_marker(update); + r.apply(marker); r.apply(update.tomb()); add_cells_to_view(*_base, *_view, update.cells(), r.cells()); } @@ -382,41 +377,51 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_ * Deletes the view entry corresponding to the provided base row. * This method checks that the base row does match the view filter before bothering. */ -void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) { +void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) { // Before deleting an old entry, make sure it was matching the view filter // (otherwise there is nothing to delete) if (!is_partition_key_empty(*_base, *_view, base_key, existing) && matches_view_filter(*_base, _view_info, base_key, existing, now)) { - do_delete_old_entry(base_key, existing, t, now); + do_delete_old_entry(base_key, existing, update, now); } } -void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) { - if (t) { - get_view_row(base_key, existing).apply(t); - return; +void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) { + auto& r = get_view_row(base_key, existing); + auto col_id = _view_info.base_non_pk_column_in_view_pk(); + if (col_id) { + // We delete the old row using a shadowable row tombstone, making sure that + // the tombstone deletes everything in the row (or it might still show up). + // Note: multi-cell columns can't be part of the primary key. + auto cell = existing.cells().cell_at(*col_id).as_atomic_cell(); + if (cell.is_live()) { + r.apply(shadowable_tombstone(cell.timestamp(), now)); + } + } else { + auto ts = existing.marker().timestamp(); + auto set_max_ts = [&ts] (atomic_cell_view&& cell) { + ts = std::max(ts, cell.timestamp()); + }; + if (!_view_info.include_all_columns()) { + existing.cells().for_each_cell([&, this] (column_id id, const atomic_cell_or_collection& cell) { + auto& def = _base->regular_column_at(id); + if (_view_info.view_column(def)) { + return; + } + // Unselected columns are used regardless of being live or dead, since we don't know if + // they were used to compute the view entry's row marker. + if (def.is_atomic()) { + set_max_ts(cell.as_atomic_cell()); + } else { + collection_type_impl::for_each_cell(cell.as_collection_mutation(), set_max_ts); + } + }); + } + auto marker = row_marker(tombstone(ts, now)); + r.apply(marker); + auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); + add_cells_to_view(*_base, *_view, std::move(diff), r.cells()); } - // We delete the old row using a shadowable row tombstone, making sure that - // the tombstone deletes everything in the row (or it might still show up). - // FIXME: If the entry is "resurrected" by a later update, we would need to - // ensure that the timestamp for the entry then is bigger than the tombstone - // we're just inserting, which is not currently guaranteed. See CASSANDRA-11500 - // for details. - auto ts = existing.marker().timestamp(); - auto set_max_ts = [&ts] (atomic_cell_view&& cell) { - ts = std::max(ts, cell.timestamp()); - }; - existing.cells().for_each_cell([&, this] (column_id id, const atomic_cell_or_collection& cell) { - auto* def = _view_info.view_column(*_base, id); - if (!def) { - return; - } - if (def->is_atomic()) { - set_max_ts(cell.as_atomic_cell()); - } else { - collection_type_impl::for_each_cell(cell.as_collection_mutation(), set_max_ts); - } - }); - get_view_row(base_key, existing).apply(shadowable_tombstone(ts, now)); + r.apply(update.tomb()); } /** @@ -435,16 +440,17 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_ return; } if (is_partition_key_empty(*_base, *_view, base_key, update) || !matches_view_filter(*_base, _view_info, base_key, update, now)) { - do_delete_old_entry(base_key, existing, row_tombstone(), now); + do_delete_old_entry(base_key, existing, update, now); return; } deletable_row& r = get_view_row(base_key, update); - r.apply(compute_row_marker(update)); + auto marker = compute_row_marker(update); + r.apply(marker); r.apply(update.tomb()); auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); - add_cells_to_view(*_base, *_view, diff, r.cells()); + add_cells_to_view(*_base, *_view, std::move(diff), r.cells()); } void view_updates::generate_update( @@ -463,16 +469,16 @@ void view_updates::generate_update( return; } - auto col_id = _view_info.base_non_pk_column_in_view_pk(*_base); + auto col_id = _view_info.base_non_pk_column_in_view_pk(); if (!col_id) { // The view key is necessarily the same pre and post update. - if (existing && !existing->empty()) { - if (update.empty()) { - delete_old_entry(base_key, *existing, update.tomb(), now); - } else { + if (existing && existing->is_live(*_base)) { + if (update.is_live(*_base)) { update_entry(base_key, update, *existing, now); + } else { + delete_old_entry(base_key, *existing, update, now); } - } else if (!update.empty()) { + } else if (update.is_live(*_base)) { create_entry(base_key, update, now); } return; @@ -491,7 +497,7 @@ void view_updates::generate_update( replace_entry(base_key, update, *existing, now); } } else { - delete_old_entry(base_key, *existing, update.tomb(), now); + delete_old_entry(base_key, *existing, update, now); } return; } @@ -599,13 +605,13 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona // We allow existing to be disengaged, which we treat the same as an empty row. if (existing) { - existing->marker().compact_and_expire(tombstone(), _now, always_gc, gc_before); - existing->cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before); + existing->marker().compact_and_expire(existing->tomb().tomb(), _now, always_gc, gc_before); + existing->cells().compact_and_expire(*_schema, column_kind::regular_column, existing->tomb(), _now, always_gc, gc_before, existing->marker()); update.apply(*_schema, *existing); } - update.marker().compact_and_expire(tombstone(), _now, always_gc, gc_before); - update.cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before); + update.marker().compact_and_expire(update.tomb().tomb(), _now, always_gc, gc_before); + update.cells().compact_and_expire(*_schema, column_kind::regular_column, update.tomb(), _now, always_gc, gc_before, update.marker()); for (auto&& v : _view_updates) { v.generate_update(_key, update, existing, _now); @@ -613,9 +619,7 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona } static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clustering_row& row) { - for (auto&& rt : tracker.range_tombstones_for_row(row.key())) { - row.apply(rt.tomb); - } + row.apply(tracker.tombstone_for_row(row.key())); } future view_update_builder::on_results() { diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 712f5db039..9304e8c2db 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -215,7 +215,7 @@ public: } t.apply(current_tombstone); bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before); - is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before); + is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before, cr.marker()); if (only_live() && is_live) { partition_is_not_empty(consumer); auto stop = consumer.consume(std::move(cr), t, true); diff --git a/mutation_fragment.hh b/mutation_fragment.hh index 9ddcc13b61..aadca81999 100644 --- a/mutation_fragment.hh +++ b/mutation_fragment.hh @@ -75,6 +75,11 @@ public: return !_t && _marker.is_missing() && _cells.empty(); } + bool is_live(const schema& s, tombstone base_tombstone = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const { + base_tombstone.apply(_t.tomb()); + return _marker.is_live(base_tombstone, now) || _cells.is_live(s, column_kind::regular_column, base_tombstone, now); + } + void apply(const schema& s, clustering_row&& cr) { _marker.apply(std::move(cr._marker)); _t.apply(cr._t, _marker); @@ -138,6 +143,10 @@ public: return _cells.empty(); } + bool is_live(const schema& s, gc_clock::time_point now = gc_clock::time_point::min()) const { + return _cells.is_live(s, column_kind::static_column, tombstone(), now); + } + void apply(const schema& s, const row& r) { _cells.apply(s, column_kind::static_column, r); } diff --git a/mutation_partition.cc b/mutation_partition.cc index 61ef9da90b..0c5e65c5ef 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -35,6 +35,7 @@ #include "intrusive_set_external_comparator.hh" #include "counters.hh" #include "row_cache.hh" +#include "view_info.hh" #include template @@ -1264,8 +1265,8 @@ uint32_t mutation_partition::do_compact(const schema& s, deletable_row& row = e.row(); row_tombstone tomb = tombstone_for_row(s, e); - bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before); - is_live |= row.marker().compact_and_expire(tomb.tomb(), query_time, can_gc, gc_before); + bool is_live = row.marker().compact_and_expire(tomb.tomb(), query_time, can_gc, gc_before); + is_live |= row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before, row.marker()); if (should_purge_row_tombstone(row.deleted_at())) { row.remove_tombstone(); @@ -1319,7 +1320,7 @@ void mutation_partition::compact_for_compaction(const schema& s, do_compact(s, compaction_time, all_rows, false, query::max_rows, can_gc); } -// Returns true if there is no live data or tombstones. +// Returns true if the mutation_partition represents no writes. bool mutation_partition::empty() const { if (_tombstone.timestamp != api::missing_timestamp) { @@ -1335,8 +1336,12 @@ deletable_row::is_live(const schema& s, tombstone base_tombstone, gc_clock::time // row is live. Otherwise, a row is considered live if it has any cell // which is live. base_tombstone.apply(_deleted_at.tomb()); - return _marker.is_live(base_tombstone, query_time) - || has_any_live_data(s, column_kind::regular_column, _cells, base_tombstone, query_time); + return _marker.is_live(base_tombstone, query_time) || _cells.is_live(s, column_kind::regular_column, base_tombstone, query_time); +} + +bool +row::is_live(const schema& s, column_kind kind, tombstone base_tombstone, gc_clock::time_point query_time) const { + return has_any_live_data(s, kind, *this, base_tombstone, query_time); } bool @@ -1548,9 +1553,30 @@ void row::apply_monotonically(const schema& s, column_kind kind, row&& other) { }); } -bool row::compact_and_expire(const schema& s, column_kind kind, row_tombstone tomb, gc_clock::time_point query_time, - can_gc_fn& can_gc, gc_clock::time_point gc_before) +// When views contain a primary key column that is not part of the base table primary key, +// that column determines whether the row is live or not. We need to ensure that when that +// cell is dead, and thus the derived row marker, either by normal deletion of by TTL, so +// is the rest of the row. To ensure that none of the regular columns keep the row alive, +// we erase the live cells according to the shadowable_tombstone rules. +static bool dead_marker_shadows_row(const schema& s, column_kind kind, const row_marker& marker) { + return s.is_view() + && s.view_info()->base_non_pk_column_in_view_pk() + && !marker.is_live() + && kind == column_kind::regular_column; // not applicable to static rows +} + +bool row::compact_and_expire( + const schema& s, + column_kind kind, + row_tombstone tomb, + gc_clock::time_point query_time, + can_gc_fn& can_gc, + gc_clock::time_point gc_before, + const row_marker& marker) { + if (dead_marker_shadows_row(s, kind, marker)) { + tomb.apply(shadowable_tombstone(api::max_timestamp, gc_clock::time_point::max()), row_marker()); + } bool any_live = false; remove_if([&] (column_id id, atomic_cell_or_collection& c) { bool erase = false; @@ -1592,6 +1618,17 @@ bool row::compact_and_expire(const schema& s, column_kind kind, row_tombstone to return any_live; } +bool row::compact_and_expire( + const schema& s, + column_kind kind, + row_tombstone tomb, + gc_clock::time_point query_time, + can_gc_fn& can_gc, + gc_clock::time_point gc_before) { + row_marker m; + return compact_and_expire(s, kind, tomb, query_time, can_gc, gc_before, m); +} + deletable_row deletable_row::difference(const schema& s, column_kind kind, const deletable_row& other) const { deletable_row dr; diff --git a/mutation_partition.hh b/mutation_partition.hh index 339755e400..d8a6798923 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -314,8 +314,22 @@ public: // Expires cells based on query_time. Expires tombstones based on gc_before // and max_purgeable. Removes cells covered by tomb. // Returns true iff there are any live cells left. - bool compact_and_expire(const schema& s, column_kind kind, row_tombstone tomb, gc_clock::time_point query_time, - can_gc_fn&, gc_clock::time_point gc_before); + bool compact_and_expire( + const schema& s, + column_kind kind, + row_tombstone tomb, + gc_clock::time_point query_time, + can_gc_fn&, + gc_clock::time_point gc_before, + const row_marker& marker); + + bool compact_and_expire( + const schema& s, + column_kind kind, + row_tombstone tomb, + gc_clock::time_point query_time, + can_gc_fn&, + gc_clock::time_point gc_before); row difference(const schema&, column_kind, const row& other) const; @@ -327,6 +341,8 @@ public: void prepare_hash(const schema& s, column_kind kind) const; + bool is_live(const schema&, column_kind kind, tombstone tomb = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const; + friend std::ostream& operator<<(std::ostream& os, const row& r); }; @@ -1030,7 +1046,7 @@ public: // Range tombstones will be trimmed to the boundaries of the clustering ranges. mutation_partition sliced(const schema& s, const query::clustering_row_ranges&) const; - // Returns true if there is no live data or tombstones. + // Returns true if the mutation_partition represents no writes. bool empty() const; public: deletable_row& clustered_row(const schema& s, const clustering_key& key); diff --git a/test.py b/test.py index c6f86e9ccc..0a9564a141 100755 --- a/test.py +++ b/test.py @@ -90,6 +90,7 @@ boost_tests = [ 'cell_locker_test', 'view_schema_test', 'view_build_test', + 'view_complex_test', 'clustering_ranges_walker_test', 'vint_serialization_test', 'duration_test', diff --git a/tests/view_complex_test.cc b/tests/view_complex_test.cc new file mode 100644 index 0000000000..55edc87cac --- /dev/null +++ b/tests/view_complex_test.cc @@ -0,0 +1,1193 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include + +#include "database.hh" +#include "db/view/view_builder.hh" +#include "sstables/compaction_manager.hh" + +#include "tests/test-utils.hh" +#include "tests/cql_test_env.hh" +#include "tests/cql_assertions.hh" + +#include "db/config.hh" + +using namespace std::literals::chrono_literals; + +// Requires on #3362 +#if 0 +void test_partial_delete_unselected_column(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, a int, b int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c from cf " + "where p is not null and c is not null " + "primary key (p, c)").get(); + + e.execute_cql("update cf using timestamp 10 set b = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(1)}, {int32_type->decompose(1)} }}); + }); + + e.execute_cql("delete b from cf using timestamp 11 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 1 set a = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(1)}, {int32_type->decompose(1)} }}); + }); + + e.execute_cql("update cf using timestamp 18 set a = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(1)}, {int32_type->decompose(1)} }}); + }); + + e.execute_cql("update cf using timestamp 20 set a = null where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1) using timestamp 15").get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); +} + +SEASTAR_TEST_CASE(test_partial_delete_unselected_column_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_partial_delete_unselected_column(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_partial_delete_unselected_column_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_partial_delete_unselected_column(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} +#endif + +void test_partial_delete_selected_column(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, a int, b int, e int, f int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select a, b from cf " + "where p is not null and c is not null " + "primary key (p, c)").get(); + + e.execute_cql("update cf using timestamp 10 set b = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("delete b from cf using timestamp 11 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 1 set a = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); + + e.execute_cql("delete a from cf using timestamp 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1) using timestamp 0").get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 12 set b = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("delete b from cf using timestamp 13 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("delete from cf using timestamp 14 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1) using timestamp 15").get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using ttl 3 set b = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); + + forward_jump_clocks(4s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("delete from cf using timestamp 15 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + // removal generated by unselected column should not shadow selected column with smaller timestamp + e.execute_cql("update cf using timestamp 18 set e = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 18 set e = null where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 16 set a = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); +} + +SEASTAR_TEST_CASE(test_partial_delete_selected_column_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_partial_delete_selected_column(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_partial_delete_selected_column_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_partial_delete_selected_column(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_update_column_in_view_pk_with_ttl(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int primary key, a int, b int)").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and a is not null " + "primary key (a, p)").get(); + + e.execute_cql("update cf set a = 1 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + }}); + }); + + e.execute_cql("delete a from cf where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p) values (1)").get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using ttl 5 set a = 10 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(10)}, + {int32_type->decompose(1)}, + { }, + }}); + }); + + e.execute_cql("update cf set b = 100 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(10)}, + {int32_type->decompose(1)}, + {int32_type->decompose(100)} + }}); + }); + + forward_jump_clocks(6s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); +} + +SEASTAR_TEST_CASE(test_update_column_in_view_pk_with_ttl_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_update_column_in_view_pk_with_ttl(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_update_column_in_view_pk_with_ttl_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_update_column_in_view_pk_with_ttl(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +SEASTAR_TEST_CASE(test_unselected_column_can_preserve_ttld_row_maker) { + return do_with_cql_env_thread([] (auto& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("insert into cf (p, c) values (0, 0) using ttl 60").get(); + e.execute_cql("update cf using ttl 0 set v = 0 where p = 0 and c = 0").get(); + forward_jump_clocks(65s); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(0)}, {int32_type->decompose(0)}, }}); + }); + }); +} + +void test_update_column_not_in_view(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("update cf using timestamp 0 set v1 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + e.execute_cql("delete v1 from cf using timestamp 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 1 set v1 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 2 set v2 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + e.execute_cql("delete v1 from cf using timestamp 3 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + e.execute_cql("delete v2 from cf using timestamp 4 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using ttl 3 set v2 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + forward_jump_clocks(3s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf set v2 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + assert_that_failed(e.execute_cql("alter table cf drop v2;")); +} + +SEASTAR_TEST_CASE(test_update_column_not_in_view_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_update_column_not_in_view(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_update_column_not_in_view_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_update_column_not_in_view(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_partial_update_with_unselected_collections(cql_test_env& e, std::function&& maybe_flush) { +e.execute_cql("create table cf (p int, c int, a int, b int, l list, s set, m map, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select a, b from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("update cf set l=l+[1,2,3] where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf set l=l-[1,2] where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf set b = 3 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(3)} + }}); + }); + + e.execute_cql("update cf set b=null, l=l-[3], s=s-{3} where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf set m=m+{3:3}, l=l-[1], s=s-{2} where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + assert_that_failed(e.execute_cql("alter table cf drop m;")); +} + +SEASTAR_TEST_CASE(test_partial_update_with_unselected_collections_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_partial_update_with_unselected_collections(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_partial_update_with_unselected_collections_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_partial_update_with_unselected_collections(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_unselected_columns_ttl(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("insert into cf (p, c) values (1, 1) using ttl 3").get(); + e.execute_cql("update cf using ttl 1000 set v = 0 where p = 1 and c = 1").get(); + maybe_flush(); + + forward_jump_clocks(4s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("delete v from cf where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1)").get(); + e.execute_cql("update cf using ttl 3 set v = 0 where p = 1 and c = 1").get(); + e.execute_cql("insert into cf (p, c) values (3, 3) using ttl 3").get(); + + forward_jump_clocks(4s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + msg = e.execute_cql("select * from vcf where p = 3 and c = 3").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf set v = 0 where p = 3 and c = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 3 and c = 3").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(3)}, + {int32_type->decompose(3)} + }}); + }); +} + +SEASTAR_TEST_CASE(test_unselected_columns_ttl_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_unselected_columns_ttl(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_unselected_columns_ttl_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_unselected_columns_ttl(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_partition_deletion(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, a int, b int, c int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and a is not null " + "primary key (p, a)").get(); + + e.execute_cql("insert into cf (p, a, b, c) values (1, 1, 1, 1) using timestamp 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("update cf using timestamp 1 set a = null where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("delete from cf using timestamp 2 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 3 set a = 1, b = 1 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); +} + +SEASTAR_TEST_CASE(test_partition_deletion_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_partition_deletion(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_partition_deletion_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_partition_deletion(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_commutative_row_deletion(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, v1 int, v2 int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and v1 is not null " + "primary key (v1, p)").get(); + + e.execute_cql("insert into cf (p, v1, v2) values (3, 1, 3) using timestamp 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(3)}, + {long_type->decompose(1L)} + }}); + }); + + e.execute_cql("delete from cf using timestamp 2 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, v1) values (3, 1) using timestamp 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 4 set v1 = 2 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(2)}, + {int32_type->decompose(3)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 5 set v1 = 1 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + { }, + { } + }}); + }); + + e.local_db().get_compaction_manager().submit_major_compaction(&e.local_db().find_column_family("ks", "vcf")).get(); +} + +SEASTAR_TEST_CASE(test_commutative_row_deletion_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_commutative_row_deletion(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_commutative_row_deletion_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_commutative_row_deletion(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +SEASTAR_TEST_CASE(test_unselected_column_with_expired_marker) { + return do_with_cql_env_thread([] (auto& e) { + e.execute_cql("create table cf (p int, c int, a int, b int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c, b from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("update cf set a = 1 where p = 1 and c = 1").get(); + e.local_db().flush_all_memtables().get(); + e.execute_cql("insert into cf (p, c) values (1, 1) using ttl 5").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); + + forward_jump_clocks(6s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); + + e.execute_cql("update cf set a = null where p = 1 and c = 1").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 1 set b = 1 where p = 1 and c = 1").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + }); +} + +void test_update_with_column_timestamp_smaller_than_pk(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, v1 int, v2 int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and v1 is not null " + "primary key (v1, p)").get(); + + e.execute_cql("insert into cf (p, v1, v2) values (3, 1, 3) using timestamp 6").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + {int32_type->decompose(3)}, + {long_type->decompose(6L)} + }}); + }); + + e.execute_cql("insert into cf (p) values (3) using timestamp 20").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + {int32_type->decompose(3)}, + {long_type->decompose(6L)} + }}); + }); + + e.execute_cql("update cf using timestamp 7 set v1 = 2 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(2)}, + {int32_type->decompose(3)}, + {int32_type->decompose(3)}, + {long_type->decompose(6L)} + }}); + }); + + e.execute_cql("update cf using timestamp 8 set v1 = 1 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + {int32_type->decompose(3)}, + {long_type->decompose(6L)} + }}); + }); +} + +SEASTAR_TEST_CASE(test_update_with_column_timestamp_smaller_than_pk_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_update_with_column_timestamp_smaller_than_pk(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_update_with_column_timestamp_smaller_than_pk_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_update_with_column_timestamp_smaller_than_pk(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_expired_marker_with_limit(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, a int, b int, primary key (p))").get(); + e.execute_cql("create materialized view vcf1 as select * from cf " + "where p is not null and a is not null " + "primary key (p, a)").get(); + e.execute_cql("create materialized view vcf2 as select * from cf " + "where p is not null and a is not null " + "primary key (a, p)").get(); + + for (int i = 1; i <= 100; i++) { + e.execute_cql(sprint("insert into cf (p, a, b) values (%d, %d, %d)", i, i, i)).get(); + } + for (int i = 1; i <= 100; i++) { + if (i % 50 != 0) { + e.execute_cql(sprint("delete a from cf where p = %d", i)).get(); + } + } + + maybe_flush(); + + for (auto view : {"vcf1", "vcf2"}) { + eventually([&] { + auto msg = e.execute_cql(sprint("select * from %s limit 1", view)).get0(); + assert_that(msg).is_rows().with_size(1); + msg = e.execute_cql(sprint("select * from %s limit 2", view)).get0(); + assert_that(msg).is_rows().with_size(2); + msg = e.execute_cql(sprint("select * from %s", view)).get0(); + assert_that(msg).is_rows().with_rows({ + {{int32_type->decompose(50)}, {int32_type->decompose(50)}, {int32_type->decompose(50)}}, + {{int32_type->decompose(100)}, {int32_type->decompose(100)}, {int32_type->decompose(100)}}, + }); + + }); + } +} + +SEASTAR_TEST_CASE(test_expired_marker_with_limit_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_expired_marker_with_limit(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_expired_marker_with_limit_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_expired_marker_with_limit(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_update_with_column_timestamp_bigger_than_pk(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, a int, b int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and a is not null " + "primary key (p, a)").get(); + + e.execute_cql("delete from cf using timestamp 0 where p = 1").get(); + maybe_flush(); + + e.execute_cql("insert into cf (p, a, b) values (1, 1, 1) using timestamp 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("update cf using timestamp 10 set b = 2 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(2)} + }}); + }); + + e.execute_cql("update cf using timestamp 2 set a = 2 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(2)}, + {int32_type->decompose(2)} + }}); + }); + + e.local_db().get_compaction_manager().submit_major_compaction(&e.local_db().find_column_family("ks", "vcf")).get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf limit 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(2)}, + {int32_type->decompose(2)} + }}); + }); + + e.execute_cql("update cf using timestamp 11 set a = 1 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf limit 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(2)} + }}); + }); + + e.execute_cql("update cf using timestamp 12 set a = null where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf limit 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 13 set a = 1 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf limit 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(2)} + }}); + }); +} + +SEASTAR_TEST_CASE(test_update_with_column_timestamp_bigger_than_pk_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_update_with_column_timestamp_bigger_than_pk(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_update_with_column_timestamp_bigger_than_pk_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_update_with_column_timestamp_bigger_than_pk(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_no_regular_base_column_in_view_pk(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("update cf using timestamp 1 set v1 = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 2 set v1 = null, v2 = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("update cf using timestamp 2 set v2 = null where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1) using timestamp 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("delete from cf using timestamp 4 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 5 set v2 = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); +} + +SEASTAR_TEST_CASE(test_no_regular_base_column_in_view_pk_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_no_regular_base_column_in_view_pk(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_no_regular_base_column_in_view_pk_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_no_regular_base_column_in_view_pk(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +SEASTAR_TEST_CASE(test_shadowing_row_marker) { + return do_with_cql_env_thread([] (auto& e) { + e.execute_cql("create table cf (p int, v1 int, v2 int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and v1 is not null " + "primary key (v1, p)").get(); + + e.execute_cql("insert into cf (p, v1, v2) values (1, 1, 1)").get(); + + e.execute_cql("update cf set v1 = null where p = 1").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using ttl 5 set v1 = 1 where p = 1").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + forward_jump_clocks(6s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + }); +} + +void test_marker_timestamp_is_not_shadowed_by_previous_update(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c, v1 from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("insert into cf (p, c, v1, v2) VALUES(1, 1, 1, 1) using ttl 5").get(); + maybe_flush(); + e.execute_cql("update cf using ttl 1000 set v2 = 1 where p = 1 and c = 1").get(); + maybe_flush(); + e.execute_cql("delete v2 from cf where p = 1 and c = 1").get(); + maybe_flush(); + forward_jump_clocks(6s); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); +} + +SEASTAR_TEST_CASE(test_marker_timestamp_is_not_shadowed_by_previous_update_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_marker_timestamp_is_not_shadowed_by_previous_update(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_marker_timestamp_is_not_shadowed_by_previous_updatewith_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_marker_timestamp_is_not_shadowed_by_previous_update(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} diff --git a/tests/view_schema_test.cc b/tests/view_schema_test.cc index 5b8abc0f97..4090cde165 100644 --- a/tests/view_schema_test.cc +++ b/tests/view_schema_test.cc @@ -1292,27 +1292,6 @@ SEASTAR_TEST_CASE(test_update) { }); } -SEASTAR_TEST_CASE(test_ignore_update) { - return do_with_cql_env_thread([] (auto& e) { - e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c));").get(); - e.execute_cql("create materialized view mv as select p, c, v1 from cf " - "where p is not null and c is not null and v1 is not null primary key (c, p)").get(); - - e.execute_cql("insert into cf (p, c, v2) values (0, 0, 0)").get(); - eventually([&] { - auto msg = e.execute_cql("select * from mv").get0(); - assert_that(msg).is_rows() - .with_size(1); - }); - e.execute_cql("update cf set v2 = 3 where p = 1 and c = 1").get(); - eventually([&] { - auto msg = e.execute_cql("select * from mv").get0(); - assert_that(msg).is_rows() - .with_size(1); - }); - }); -} - SEASTAR_TEST_CASE(test_ttl) { return do_with_cql_env_thread([] (auto& e) { e.execute_cql("create table cf (p int, c int, v1 int, v2 int, v3 int, primary key (p, c));").get(); @@ -2935,6 +2914,30 @@ void complex_timestamp_with_base_pk_columns_in_view_pk_deletion_test(cql_test_en assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(2)}, {int32_type->decompose(1)}, {}, {} }}); }); + // Reset values TS=10 + e.execute_cql("insert into cf (p, c, v1, v2) values (1, 2, 3, 4) using timestamp 10").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf where p = 1 and c = 2").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(3)}, {int32_type->decompose(4)}, {long_type->decompose(10L)} }}); + }); + + // Update values TS=20 + e.execute_cql("update cf using timestamp 20 set v2 = 5 where p = 1 and c = 2").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf where p = 1 and c = 2").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(3)}, {int32_type->decompose(5)}, {long_type->decompose(20L)} }}); + }); + + // Delete row TS=10 + e.execute_cql("delete from cf using timestamp 10 where p = 1 and c = 2").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ { }, {int32_type->decompose(5)}, {long_type->decompose(20L)} }}); + }); + e.execute_cql("drop materialized view vcf").get(); e.execute_cql("drop table cf").get(); } diff --git a/view_info.hh b/view_info.hh index b3a832b9c3..f060c0f5f9 100644 --- a/view_info.hh +++ b/view_info.hh @@ -33,8 +33,8 @@ class view_info final { mutable shared_ptr _select_statement; mutable stdx::optional _partition_slice; mutable stdx::optional _partition_ranges; - // Lazily initializes the column id of a regular base table included in the view's PK, if any. - mutable stdx::optional> _base_non_pk_column_in_view_pk; + // Id of a regular base table column included in the view's PK, if any. + mutable stdx::optional _base_non_pk_column_in_view_pk; public: view_info(const schema& schema, const raw_view_info& raw_view_info); @@ -62,7 +62,9 @@ public: const query::partition_slice& partition_slice() const; const dht::partition_range_vector& partition_ranges() const; const column_definition* view_column(const schema& base, column_id base_id) const; - stdx::optional base_non_pk_column_in_view_pk(const schema& base) const; + const column_definition* view_column(const column_definition& base_def) const; + stdx::optional base_non_pk_column_in_view_pk() const; + void initialize_base_dependent_fields(const schema& base); friend bool operator==(const view_info& x, const view_info& y) { return x._raw == y._raw;