From b77b71436d576ca49bdab32ee9b71bc2f609a4b3 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:05:08 +0100 Subject: [PATCH 01/14] cql3/alter_table_statement: Forbid dropping columns of MV base tables When a view's PK only contains the columns that form the base's PK, then the liveness of a particular view row is determined not only by the base row's marker, but also by the selected and, more importantly, unselected columns. The fact that unselected columns can keep a view row alive also requires that users cannot drop columns of base tables with materialized views, which this patch implements. Refs #3362 Signed-off-by: Duarte Nunes --- cql3/statements/alter_table_statement.cc | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cql3/statements/alter_table_statement.cc b/cql3/statements/alter_table_statement.cc index 34d35d7247..f1e771e4d4 100644 --- a/cql3/statements/alter_table_statement.cc +++ b/cql3/statements/alter_table_statement.cc @@ -305,14 +305,10 @@ future> alter_table_statement::a } } - // If a column is dropped which is included in a view, we don't allow the drop to take place. - auto view_names = ::join(", ", cf.views() - | boost::adaptors::filtered([&] (auto&& v) { return bool(v->get_column_definition(column_name->name())); }) - | boost::adaptors::transformed([] (auto&& v) { return v->cf_name(); })); - if (!view_names.empty()) { + if (!cf.views().empty()) { throw exceptions::invalid_request_exception(sprint( - "Cannot drop column %s, depended on by materialized views (%s.{%s})", - column_name, keyspace(), view_names)); + "Cannot drop column %s on base table %s.%s with materialized views", + column_name, keyspace(), column_family())); } break; } From 31370fd7b1ad8ea01407ff6acb687059f78e3cdd Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:10:55 +0100 Subject: [PATCH 02/14] view_info: Explicitly initialize base-dependent fields Instead of lazily-initializing the regular base column in the view's PK field, explicitly initialize it. This will be used by future patches that don't have access to the schema when wanting to obtain that column. Signed-off-by: Duarte Nunes --- database.cc | 1 + db/view/view.cc | 27 ++++++++++++++------------- view_info.hh | 7 ++++--- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/database.cc b/database.cc index c4e7b602bb..e348931672 100644 --- a/database.cc +++ b/database.cc @@ -4275,6 +4275,7 @@ static std::vector::iterator find_view(std::vector& views, c } void column_family::add_or_update_view(view_ptr v) { + v->view_info()->initialize_base_dependent_fields(*schema()); auto existing = find_view(_views, v); if (existing != _views.end()) { *existing = std::move(v); diff --git a/db/view/view.cc b/db/view/view.cc index 516e7cb9e0..0b5cc5bdd5 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -114,18 +114,19 @@ const column_definition* view_info::view_column(const schema& base, column_id ba return _schema.get_column_definition(base.regular_column_at(base_id).name()); } -stdx::optional view_info::base_non_pk_column_in_view_pk(const schema& base) const { - if (!_base_non_pk_column_in_view_pk) { - _base_non_pk_column_in_view_pk.emplace(stdx::nullopt); - for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) { - auto* base_col = base.get_column_definition(view_col.name()); - if (!base_col->is_primary_key()) { - _base_non_pk_column_in_view_pk.emplace(base_col->id); - break; - } + +stdx::optional view_info::base_non_pk_column_in_view_pk() const { + return _base_non_pk_column_in_view_pk; +} + +void view_info::initialize_base_dependent_fields(const schema& base) { + for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) { + auto* base_col = base.get_column_definition(view_col.name()); + if (!base_col->is_primary_key()) { + _base_non_pk_column_in_view_pk.emplace(base_col->id); + break; } } - return *_base_non_pk_column_in_view_pk; } namespace db { @@ -178,7 +179,7 @@ static bool update_requires_read_before_write(const schema& base, // However, if the view has restrictions on regular columns, then a write that doesn't match those filters // needs to add a tombstone (assuming a previous update matched those filter and created a view entry); for // now we just do a read-before-write in that case. - if (!vf.base_non_pk_column_in_view_pk(base) + if (!vf.base_non_pk_column_in_view_pk() && vf.select_statement().get_restrictions()->get_non_pk_restriction().empty()) { continue; } @@ -291,7 +292,7 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons */ auto marker = base_row.marker(); - auto col_id = _view_info.base_non_pk_column_in_view_pk(*_base); + auto col_id = _view_info.base_non_pk_column_in_view_pk(); if (col_id) { // Note: multi-cell columns can't be part of the primary key. auto cell = base_row.cells().cell_at(*col_id).as_atomic_cell(); @@ -463,7 +464,7 @@ void view_updates::generate_update( return; } - auto col_id = _view_info.base_non_pk_column_in_view_pk(*_base); + auto col_id = _view_info.base_non_pk_column_in_view_pk(); if (!col_id) { // The view key is necessarily the same pre and post update. if (existing && !existing->empty()) { diff --git a/view_info.hh b/view_info.hh index b3a832b9c3..9846c5bcab 100644 --- a/view_info.hh +++ b/view_info.hh @@ -33,8 +33,8 @@ class view_info final { mutable shared_ptr _select_statement; mutable stdx::optional _partition_slice; mutable stdx::optional _partition_ranges; - // Lazily initializes the column id of a regular base table included in the view's PK, if any. - mutable stdx::optional> _base_non_pk_column_in_view_pk; + // Id of a regular base table column included in the view's PK, if any. + mutable stdx::optional _base_non_pk_column_in_view_pk; public: view_info(const schema& schema, const raw_view_info& raw_view_info); @@ -62,7 +62,8 @@ public: const query::partition_slice& partition_slice() const; const dht::partition_range_vector& partition_ranges() const; const column_definition* view_column(const schema& base, column_id base_id) const; - stdx::optional base_non_pk_column_in_view_pk(const schema& base) const; + stdx::optional base_non_pk_column_in_view_pk() const; + void initialize_base_dependent_fields(const schema& base); friend bool operator==(const view_info& x, const view_info& y) { return x._raw == y._raw; From 164f043768588492c5b274e9c5e1bd6461b06e3e Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:13:01 +0100 Subject: [PATCH 03/14] view_info: Add view_column() overload For when we already have the base's column_definition. Signed-off-by: Duarte Nunes --- db/view/view.cc | 5 ++++- view_info.hh | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/db/view/view.cc b/db/view/view.cc index 0b5cc5bdd5..c759b393bf 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -111,9 +111,12 @@ const column_definition* view_info::view_column(const schema& base, column_id ba // FIXME: Map base column_ids to view_column_ids, which can be something like // a boost::small_vector where the position is the base column_id, and the // value is either empty or the view's column_id. - return _schema.get_column_definition(base.regular_column_at(base_id).name()); + return view_column(base.regular_column_at(base_id)); } +const column_definition* view_info::view_column(const column_definition& base_def) const { + return _schema.get_column_definition(base_def.name()); +} stdx::optional view_info::base_non_pk_column_in_view_pk() const { return _base_non_pk_column_in_view_pk; diff --git a/view_info.hh b/view_info.hh index 9846c5bcab..f060c0f5f9 100644 --- a/view_info.hh +++ b/view_info.hh @@ -62,6 +62,7 @@ public: const query::partition_slice& partition_slice() const; const dht::partition_range_vector& partition_ranges() const; const column_definition* view_column(const schema& base, column_id base_id) const; + const column_definition* view_column(const column_definition& base_def) const; stdx::optional base_non_pk_column_in_view_pk() const; void initialize_base_dependent_fields(const schema& base); From b0cb5480d5a7cb3dc9b6506beba09c8c419ee690 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:31:00 +0100 Subject: [PATCH 04/14] mutation_fragment: Allow querying if row is live For clustering_row and static_row, allow querying whether they are live or not. Signed-off-by: Duarte Nunes --- mutation_fragment.hh | 9 +++++++++ mutation_partition.cc | 8 ++++++-- mutation_partition.hh | 2 ++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/mutation_fragment.hh b/mutation_fragment.hh index 9ddcc13b61..aadca81999 100644 --- a/mutation_fragment.hh +++ b/mutation_fragment.hh @@ -75,6 +75,11 @@ public: return !_t && _marker.is_missing() && _cells.empty(); } + bool is_live(const schema& s, tombstone base_tombstone = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const { + base_tombstone.apply(_t.tomb()); + return _marker.is_live(base_tombstone, now) || _cells.is_live(s, column_kind::regular_column, base_tombstone, now); + } + void apply(const schema& s, clustering_row&& cr) { _marker.apply(std::move(cr._marker)); _t.apply(cr._t, _marker); @@ -138,6 +143,10 @@ public: return _cells.empty(); } + bool is_live(const schema& s, gc_clock::time_point now = gc_clock::time_point::min()) const { + return _cells.is_live(s, column_kind::static_column, tombstone(), now); + } + void apply(const schema& s, const row& r) { _cells.apply(s, column_kind::static_column, r); } diff --git a/mutation_partition.cc b/mutation_partition.cc index 61ef9da90b..466f0d3039 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1335,8 +1335,12 @@ deletable_row::is_live(const schema& s, tombstone base_tombstone, gc_clock::time // row is live. Otherwise, a row is considered live if it has any cell // which is live. base_tombstone.apply(_deleted_at.tomb()); - return _marker.is_live(base_tombstone, query_time) - || has_any_live_data(s, column_kind::regular_column, _cells, base_tombstone, query_time); + return _marker.is_live(base_tombstone, query_time) || _cells.is_live(s, column_kind::regular_column, base_tombstone, query_time); +} + +bool +row::is_live(const schema& s, column_kind kind, tombstone base_tombstone, gc_clock::time_point query_time) const { + return has_any_live_data(s, kind, *this, base_tombstone, query_time); } bool diff --git a/mutation_partition.hh b/mutation_partition.hh index 339755e400..663e899b43 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -327,6 +327,8 @@ public: void prepare_hash(const schema& s, column_kind kind) const; + bool is_live(const schema&, column_kind kind, tombstone tomb = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const; + friend std::ostream& operator<<(std::ostream& os, const row& r); }; From e6467f46b776ce146bb58c1c21b0b83d06fbab89 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:29:21 +0100 Subject: [PATCH 05/14] tests/view_schema_test: Remove unneeded test Signed-off-by: Duarte Nunes --- tests/view_schema_test.cc | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/tests/view_schema_test.cc b/tests/view_schema_test.cc index 3f94a9eacc..6f38f9a0e7 100644 --- a/tests/view_schema_test.cc +++ b/tests/view_schema_test.cc @@ -1292,27 +1292,6 @@ SEASTAR_TEST_CASE(test_update) { }); } -SEASTAR_TEST_CASE(test_ignore_update) { - return do_with_cql_env_thread([] (auto& e) { - e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c));").get(); - e.execute_cql("create materialized view mv as select p, c, v1 from cf " - "where p is not null and c is not null and v1 is not null primary key (c, p)").get(); - - e.execute_cql("insert into cf (p, c, v2) values (0, 0, 0)").get(); - eventually([&] { - auto msg = e.execute_cql("select * from mv").get0(); - assert_that(msg).is_rows() - .with_size(1); - }); - e.execute_cql("update cf set v2 = 3 where p = 1 and c = 1").get(); - eventually([&] { - auto msg = e.execute_cql("select * from mv").get0(); - assert_that(msg).is_rows() - .with_size(1); - }); - }); -} - SEASTAR_TEST_CASE(test_ttl) { return do_with_cql_env_thread([] (auto& e) { e.execute_cql("create table cf (p int, c int, v1 int, v2 int, v3 int, primary key (p, c));").get(); From ac9b93eb89bf4514480395c39030c8b306fd4142 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:27:33 +0100 Subject: [PATCH 06/14] db/view: Consider partition tombstone when generating updates Not adding the partition tombstone to the current list of tombstones may cause updates to be incorrectly generated. Signed-off-by: Duarte Nunes --- db/view/view.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index c759b393bf..5b4c57cf95 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -617,9 +617,7 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona } static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clustering_row& row) { - for (auto&& rt : tracker.range_tombstones_for_row(row.key())) { - row.apply(rt.tomb); - } + row.apply(tracker.tombstone_for_row(row.key())); } future view_update_builder::on_results() { From bd3cedd2402ca2a1bf43526bbf2a08cb6f5a7539 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:15:16 +0100 Subject: [PATCH 07/14] db/view: Process base updates to column unselected by its views When a view's PK only contains the columns that form the base's PK, then the liveness of a particular view row is determined not only by the base row's marker, but also by the selected and, more importantly, unselected columns. So, process base updates to columns unselected by any of its views. Refs #3362 Signed-off-by: Duarte Nunes --- db/view/view.cc | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 5b4c57cf95..cdaf18af72 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -151,25 +151,7 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de // - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns: // even if an update don't match a view condition on a regular column, that update can still invalidate a // pre-existing entry) - note that the upper layers should already have checked the partition key; - // - the update doesn't modify any of the columns impacting the view (where "impacting" the view means that column - // is neither included in the view, nor used by the view filter). - if (!clustering_prefix_matches(base, view, key.key(), update.key())) { - return false; - } - - // We want to check if the update modifies any of the columns that are part of the view (in which case the view is - // affected). But iff the view includes all the base table columns, or the update has either a row deletion or a - // row marker, we know the view is affected right away. - if (view.include_all_columns() || update.row().deleted_at() || update.row().marker().is_live()) { - return true; - } - - bool affected = false; - update.row().cells().for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& cell) { - affected = view.view_column(base, id); - return stop_iteration(affected); - }); - return affected; + return clustering_prefix_matches(base, view, key.key(), update.key()); } static bool update_requires_read_before_write(const schema& base, From 4dfce4d369d132710e04c20bae3d0ec062bd6930 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:16:27 +0100 Subject: [PATCH 08/14] db/view: Don't avoid read-before-write when view PK matches base When a view's PK only contains the columns that form the base's PK, then the liveness of a particular view row is determined not only by the base row's marker, but also by the selected and, more importantly, unselected columns. When calculating the view's row marker we need to access those unselected columns, so we can't avoid the read-before-write as we were doing. Refs #3362 Signed-off-by: Duarte Nunes --- db/view/view.cc | 8 -------- 1 file changed, 8 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index cdaf18af72..f7eadc4e86 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -160,14 +160,6 @@ static bool update_requires_read_before_write(const schema& base, const rows_entry& update) { for (auto&& v : views) { view_info& vf = *v->view_info(); - // A view whose primary key contains only the base's primary key columns doesn't require a read-before-write. - // However, if the view has restrictions on regular columns, then a write that doesn't match those filters - // needs to add a tombstone (assuming a previous update matched those filter and created a view entry); for - // now we just do a read-before-write in that case. - if (!vf.base_non_pk_column_in_view_pk() - && vf.select_statement().get_restrictions()->get_non_pk_restriction().empty()) { - continue; - } if (may_be_affected_by(base, vf, key, update)) { return true; } From 67dac67c46e9875979f7ce07e16af2b91a438569 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:32:20 +0100 Subject: [PATCH 09/14] mutation_partition: Regular base column in view determines row liveness When views contain a primary key column that is not part of the base table primary key, that column determines whether the row is live or not. We need to ensure that when that cell is dead, and thus the derived row marker, either by normal deletion of by TTL, so is the rest of the row. This patch introduces the idea of shawdowing row marker. We map the status of the regular base column in the view's PK to the view row's marker. If this marker is dead, so is that cell in the base table, and so should the view row become. To enforce that, a view row's dead marker shadows the whole row if that view includes a base regular column in its PK. Fixes #3360 Signed-off-by: Duarte Nunes --- db/view/view.cc | 4 ++-- mutation_compactor.hh | 2 +- mutation_partition.cc | 41 +++++++++++++++++++++++++++++++++++++---- mutation_partition.hh | 18 ++++++++++++++++-- 4 files changed, 56 insertions(+), 9 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index f7eadc4e86..dbc4be0c13 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -578,12 +578,12 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona // We allow existing to be disengaged, which we treat the same as an empty row. if (existing) { existing->marker().compact_and_expire(tombstone(), _now, always_gc, gc_before); - existing->cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before); + existing->cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before, existing->marker()); update.apply(*_schema, *existing); } update.marker().compact_and_expire(tombstone(), _now, always_gc, gc_before); - update.cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before); + update.cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before, update.marker()); for (auto&& v : _view_updates) { v.generate_update(_key, update, existing, _now); diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 712f5db039..9304e8c2db 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -215,7 +215,7 @@ public: } t.apply(current_tombstone); bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before); - is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before); + is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before, cr.marker()); if (only_live() && is_live) { partition_is_not_empty(consumer); auto stop = consumer.consume(std::move(cr), t, true); diff --git a/mutation_partition.cc b/mutation_partition.cc index 466f0d3039..aad405bca5 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -35,6 +35,7 @@ #include "intrusive_set_external_comparator.hh" #include "counters.hh" #include "row_cache.hh" +#include "view_info.hh" #include template @@ -1264,8 +1265,8 @@ uint32_t mutation_partition::do_compact(const schema& s, deletable_row& row = e.row(); row_tombstone tomb = tombstone_for_row(s, e); - bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before); - is_live |= row.marker().compact_and_expire(tomb.tomb(), query_time, can_gc, gc_before); + bool is_live = row.marker().compact_and_expire(tomb.tomb(), query_time, can_gc, gc_before); + is_live |= row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before, row.marker()); if (should_purge_row_tombstone(row.deleted_at())) { row.remove_tombstone(); @@ -1552,9 +1553,30 @@ void row::apply_monotonically(const schema& s, column_kind kind, row&& other) { }); } -bool row::compact_and_expire(const schema& s, column_kind kind, row_tombstone tomb, gc_clock::time_point query_time, - can_gc_fn& can_gc, gc_clock::time_point gc_before) +// When views contain a primary key column that is not part of the base table primary key, +// that column determines whether the row is live or not. We need to ensure that when that +// cell is dead, and thus the derived row marker, either by normal deletion of by TTL, so +// is the rest of the row. To ensure that none of the regular columns keep the row alive, +// we erase the live cells according to the shadowable_tombstone rules. +static bool dead_marker_shadows_row(const schema& s, column_kind kind, const row_marker& marker) { + return s.is_view() + && s.view_info()->base_non_pk_column_in_view_pk() + && !marker.is_live() + && kind == column_kind::regular_column; // not applicable to static rows +} + +bool row::compact_and_expire( + const schema& s, + column_kind kind, + row_tombstone tomb, + gc_clock::time_point query_time, + can_gc_fn& can_gc, + gc_clock::time_point gc_before, + const row_marker& marker) { + if (dead_marker_shadows_row(s, kind, marker)) { + tomb.apply(shadowable_tombstone(api::max_timestamp, gc_clock::time_point::max()), row_marker()); + } bool any_live = false; remove_if([&] (column_id id, atomic_cell_or_collection& c) { bool erase = false; @@ -1596,6 +1618,17 @@ bool row::compact_and_expire(const schema& s, column_kind kind, row_tombstone to return any_live; } +bool row::compact_and_expire( + const schema& s, + column_kind kind, + row_tombstone tomb, + gc_clock::time_point query_time, + can_gc_fn& can_gc, + gc_clock::time_point gc_before) { + row_marker m; + return compact_and_expire(s, kind, tomb, query_time, can_gc, gc_before, m); +} + deletable_row deletable_row::difference(const schema& s, column_kind kind, const deletable_row& other) const { deletable_row dr; diff --git a/mutation_partition.hh b/mutation_partition.hh index 663e899b43..506f710245 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -314,8 +314,22 @@ public: // Expires cells based on query_time. Expires tombstones based on gc_before // and max_purgeable. Removes cells covered by tomb. // Returns true iff there are any live cells left. - bool compact_and_expire(const schema& s, column_kind kind, row_tombstone tomb, gc_clock::time_point query_time, - can_gc_fn&, gc_clock::time_point gc_before); + bool compact_and_expire( + const schema& s, + column_kind kind, + row_tombstone tomb, + gc_clock::time_point query_time, + can_gc_fn&, + gc_clock::time_point gc_before, + const row_marker& marker); + + bool compact_and_expire( + const schema& s, + column_kind kind, + row_tombstone tomb, + gc_clock::time_point query_time, + can_gc_fn&, + gc_clock::time_point gc_before); row difference(const schema&, column_kind, const row& other) const; From 4b4d1dbd1f295a513fde285640b5e8b06c86ae64 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:21:43 +0100 Subject: [PATCH 10/14] db/view: Handle unselected base columns and corner cases When a view's PK only contains the columns that form the base's PK, then the liveness of a particular view row is determined not only by the base row's marker, but also by the selected and, more importantly, unselected columns. This patch ensures that unselected columns are considered as much as possible, even though some limitations will still exist. In particular, we need to represent multiple timestamps (from all the unselected columns), but have only mechanisms to record a single timestamp. We also have some issues when dealing with selected column, and the way we currently delete them. Consider the following: create table cf (p int, c int, a int, b int, primary key (p, c)) create materialized view vcf as select a, b from cf where p is not null and c is not null primary key (p, c) 1) update cf using timestamp 10 set a = 1 where p = 1 and c = 1 2) delete a from cf using timestamp 11 where p = 1 and c = 1 3) update cf using timestamp 1 set a = 2 where p = 1 and c = 1 After 1), the MV should include a row with row marker @ ts10, p = 1, c = 1, a = 1. After 2), this row should be removed. At 3), we should add a row with row marker @ ts1, p = 1, c = 1, a = 1, with a lower timestamp. This means that the delete should not insert a row tombstone with timestamp @ 11, as we do now but it should just delete the view's row marker (which exists) with ts1. Refs #3362 Fixes #3140 Fixes #3361 Signed-off-by: Duarte Nunes --- db/view/view.cc | 158 ++++++++++++++++++++++++++++-------------------- 1 file changed, 93 insertions(+), 65 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index dbc4be0c13..e5cd58e050 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -233,12 +233,12 @@ private: row_marker compute_row_marker(const clustering_row& base_row) const; deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update); void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now); - void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now); - void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now); + void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); + void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now); void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) { create_entry(base_key, update, now); - delete_old_entry(base_key, existing, row_tombstone(), now); + delete_old_entry(base_key, existing, update, now); } }; @@ -246,12 +246,7 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons /* * We need to compute both the timestamp and expiration. * - * For the timestamp, it makes sense to use the bigger timestamp for all view PK columns. - * - * This is more complex for the expiration. We want to maintain consistency between the base and the view, so the - * entry should only exist as long as the base row exists _and_ has non-null values for all the columns that are part - * of the view PK. - * Which means we really have 2 cases: + * There are 3 cases: * 1) There is a column that is not in the base PK but is in the view PK. In that case, as long as that column * lives, the view entry does too, but as soon as it expires (or is deleted for that matter) the entry also * should expire. So the expiration for the view is the one of that column, regardless of any other expiration. @@ -262,10 +257,13 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons * UPDATE t SET c = 0 WHERE a = 0 AND b = 0; * then even after 3 seconds elapsed, the row will still exist (it just won't have a "row marker" anymore) and so * the MV should still have a corresponding entry. - * 2) The columns for the base and view PKs are exactly the same. In that case, the view entry should live - * as long as the base row lives. This means the view entry should only expire once *everything* in the - * base row has expired. So, the row TTL should be the max of any other TTL. This is particularly important - * in the case where the base row has a TTL, but a column *absent* from the view holds a greater TTL. + * This cell determines the liveness of the view row. + * 2) The columns for the base and view PKs are exactly the same, and all base columns are selected by the view. + * In that case, all components (marker, deletion and cells) are the same and trivially mapped. + * 3) The columns for the base and view PKs are exactly the same, but some base columns are not selected in the view. + * Use the max timestamp out of the base row marker and all the unselected columns - this ensures we can keep the + * view row alive. Do the same thing for the expiration, if the marker is dead or will expire, and so + * will all unselected columns. */ auto marker = base_row.marker(); @@ -273,27 +271,39 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons if (col_id) { // Note: multi-cell columns can't be part of the primary key. auto cell = base_row.cells().cell_at(*col_id).as_atomic_cell(); - auto timestamp = std::max(marker.timestamp(), cell.timestamp()); - return cell.is_live_and_has_ttl() ? row_marker(timestamp, cell.ttl(), cell.expiry()) : row_marker(timestamp); + return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp()); } - if (!marker.is_expiring()) { + if (_view_info.include_all_columns()) { return marker; } - auto ttl = marker.ttl(); - auto expiry = marker.expiry(); + auto timestamp = marker.timestamp(); + bool has_non_expiring_live_cell = false; + expiry_opt biggest_expiry; + gc_clock::duration ttl = gc_clock::duration::min(); + if (marker.is_expiring()) { + biggest_expiry = marker.expiry(); + ttl = marker.ttl(); + } auto maybe_update_expiry_and_ttl = [&] (atomic_cell_view&& cell) { - // Note: Cassandra compares cell.ttl() here, but that seems very wrong. - // See CASSANDRA-13127. - if (cell.is_live_and_has_ttl() && cell.expiry() > expiry) { - expiry = cell.expiry(); - ttl = cell.ttl(); + timestamp = std::max(timestamp, cell.timestamp()); + if (cell.is_live_and_has_ttl()) { + if (cell.expiry() >= biggest_expiry.value_or(cell.expiry())) { + biggest_expiry = cell.expiry(); + ttl = cell.ttl(); + } + } else if (cell.is_live()) { + has_non_expiring_live_cell = true; } }; + // Iterate over regular cells not in the view, as we already have the timestamps of the included columns. base_row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) { auto& def = _base->regular_column_at(id); + if (_view_info.view_column(def)) { + return; + } if (def.is_atomic()) { maybe_update_expiry_and_ttl(c.as_atomic_cell()); } else { @@ -301,7 +311,13 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons } }); - return row_marker(marker.timestamp(), ttl, expiry); + if ((marker.is_live() && !marker.is_expiring()) || has_non_expiring_live_cell) { + return row_marker(timestamp); + } + if (biggest_expiry) { + return row_marker(timestamp, ttl, *biggest_expiry); + } + return marker; } deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) { @@ -351,7 +367,8 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_ return; } deletable_row& r = get_view_row(base_key, update); - r.apply(compute_row_marker(update)); + auto marker = compute_row_marker(update); + r.apply(marker); r.apply(update.tomb()); add_cells_to_view(*_base, *_view, update.cells(), r.cells()); } @@ -360,41 +377,51 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_ * Deletes the view entry corresponding to the provided base row. * This method checks that the base row does match the view filter before bothering. */ -void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) { +void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) { // Before deleting an old entry, make sure it was matching the view filter // (otherwise there is nothing to delete) if (!is_partition_key_empty(*_base, *_view, base_key, existing) && matches_view_filter(*_base, _view_info, base_key, existing, now)) { - do_delete_old_entry(base_key, existing, t, now); + do_delete_old_entry(base_key, existing, update, now); } } -void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) { - if (t) { - get_view_row(base_key, existing).apply(t); - return; +void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) { + auto& r = get_view_row(base_key, existing); + auto col_id = _view_info.base_non_pk_column_in_view_pk(); + if (col_id) { + // We delete the old row using a shadowable row tombstone, making sure that + // the tombstone deletes everything in the row (or it might still show up). + // Note: multi-cell columns can't be part of the primary key. + auto cell = existing.cells().cell_at(*col_id).as_atomic_cell(); + if (cell.is_live()) { + r.apply(shadowable_tombstone(cell.timestamp(), now)); + } + } else { + auto ts = existing.marker().timestamp(); + auto set_max_ts = [&ts] (atomic_cell_view&& cell) { + ts = std::max(ts, cell.timestamp()); + }; + if (!_view_info.include_all_columns()) { + existing.cells().for_each_cell([&, this] (column_id id, const atomic_cell_or_collection& cell) { + auto& def = _base->regular_column_at(id); + if (_view_info.view_column(def)) { + return; + } + // Unselected columns are used regardless of being live or dead, since we don't know if + // they were used to compute the view entry's row marker. + if (def.is_atomic()) { + set_max_ts(cell.as_atomic_cell()); + } else { + collection_type_impl::for_each_cell(cell.as_collection_mutation(), set_max_ts); + } + }); + } + auto marker = row_marker(tombstone(ts, now)); + r.apply(marker); + auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); + add_cells_to_view(*_base, *_view, diff, r.cells()); } - // We delete the old row using a shadowable row tombstone, making sure that - // the tombstone deletes everything in the row (or it might still show up). - // FIXME: If the entry is "resurrected" by a later update, we would need to - // ensure that the timestamp for the entry then is bigger than the tombstone - // we're just inserting, which is not currently guaranteed. See CASSANDRA-11500 - // for details. - auto ts = existing.marker().timestamp(); - auto set_max_ts = [&ts] (atomic_cell_view&& cell) { - ts = std::max(ts, cell.timestamp()); - }; - existing.cells().for_each_cell([&, this] (column_id id, const atomic_cell_or_collection& cell) { - auto* def = _view_info.view_column(*_base, id); - if (!def) { - return; - } - if (def->is_atomic()) { - set_max_ts(cell.as_atomic_cell()); - } else { - collection_type_impl::for_each_cell(cell.as_collection_mutation(), set_max_ts); - } - }); - get_view_row(base_key, existing).apply(shadowable_tombstone(ts, now)); + r.apply(update.tomb()); } /** @@ -413,12 +440,13 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_ return; } if (is_partition_key_empty(*_base, *_view, base_key, update) || !matches_view_filter(*_base, _view_info, base_key, update, now)) { - do_delete_old_entry(base_key, existing, row_tombstone(), now); + do_delete_old_entry(base_key, existing, update, now); return; } deletable_row& r = get_view_row(base_key, update); - r.apply(compute_row_marker(update)); + auto marker = compute_row_marker(update); + r.apply(marker); r.apply(update.tomb()); auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); @@ -444,13 +472,13 @@ void view_updates::generate_update( auto col_id = _view_info.base_non_pk_column_in_view_pk(); if (!col_id) { // The view key is necessarily the same pre and post update. - if (existing && !existing->empty()) { - if (update.empty()) { - delete_old_entry(base_key, *existing, update.tomb(), now); - } else { + if (existing && existing->is_live(*_base)) { + if (update.is_live(*_base)) { update_entry(base_key, update, *existing, now); + } else { + delete_old_entry(base_key, *existing, update, now); } - } else if (!update.empty()) { + } else if (update.is_live(*_base)) { create_entry(base_key, update, now); } return; @@ -469,7 +497,7 @@ void view_updates::generate_update( replace_entry(base_key, update, *existing, now); } } else { - delete_old_entry(base_key, *existing, update.tomb(), now); + delete_old_entry(base_key, *existing, update, now); } return; } @@ -577,13 +605,13 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona // We allow existing to be disengaged, which we treat the same as an empty row. if (existing) { - existing->marker().compact_and_expire(tombstone(), _now, always_gc, gc_before); - existing->cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before, existing->marker()); + existing->marker().compact_and_expire(existing->tomb().tomb(), _now, always_gc, gc_before); + existing->cells().compact_and_expire(*_schema, column_kind::regular_column, existing->tomb(), _now, always_gc, gc_before, existing->marker()); update.apply(*_schema, *existing); } - update.marker().compact_and_expire(tombstone(), _now, always_gc, gc_before); - update.cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before, update.marker()); + update.marker().compact_and_expire(update.tomb().tomb(), _now, always_gc, gc_before); + update.cells().compact_and_expire(*_schema, column_kind::regular_column, update.tomb(), _now, always_gc, gc_before, update.marker()); for (auto&& v : _view_updates) { v.generate_update(_key, update, existing, _now); From 844e0b41d16f6a99d33c8f64bb23248790fa1b3a Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:26:55 +0100 Subject: [PATCH 11/14] db/view: Move cells instead of copying in add_cells_to_view() Signed-off-by: Duarte Nunes --- db/view/view.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index e5cd58e050..bf169de1aa 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -349,11 +349,11 @@ static const column_definition* view_column(const schema& base, const schema& vi return view.get_column_definition(base.regular_column_at(base_id).name()); } -static void add_cells_to_view(const schema& base, const schema& view, const row& base_cells, row& view_cells) { - base_cells.for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) { +static void add_cells_to_view(const schema& base, const schema& view, row base_cells, row& view_cells) { + base_cells.for_each_cell([&] (column_id id, atomic_cell_or_collection& c) { auto* view_col = view_column(base, view, id); if (view_col && !view_col->is_primary_key()) { - view_cells.append_cell(view_col->id, c); + view_cells.append_cell(view_col->id, std::move(c)); } }); } @@ -419,7 +419,7 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus auto marker = row_marker(tombstone(ts, now)); r.apply(marker); auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); - add_cells_to_view(*_base, *_view, diff, r.cells()); + add_cells_to_view(*_base, *_view, std::move(diff), r.cells()); } r.apply(update.tomb()); } @@ -450,7 +450,7 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_ r.apply(update.tomb()); auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); - add_cells_to_view(*_base, *_view, diff, r.cells()); + add_cells_to_view(*_base, *_view, std::move(diff), r.cells()); } void view_updates::generate_update( From 7ba1291731addd84362407395d6e19b224154214 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:29:39 +0100 Subject: [PATCH 12/14] tests/view_schema_test: Complete test Signed-off-by: Duarte Nunes --- tests/view_schema_test.cc | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/view_schema_test.cc b/tests/view_schema_test.cc index 6f38f9a0e7..5040e5974b 100644 --- a/tests/view_schema_test.cc +++ b/tests/view_schema_test.cc @@ -2877,6 +2877,30 @@ void complex_timestamp_with_base_pk_columns_in_view_pk_deletion_test(cql_test_en assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(2)}, {int32_type->decompose(1)}, {}, {} }}); }); + // Reset values TS=10 + e.execute_cql("insert into cf (p, c, v1, v2) values (1, 2, 3, 4) using timestamp 10").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf where p = 1 and c = 2").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(3)}, {int32_type->decompose(4)}, {long_type->decompose(10L)} }}); + }); + + // Update values TS=20 + e.execute_cql("update cf using timestamp 20 set v2 = 5 where p = 1 and c = 2").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf where p = 1 and c = 2").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(3)}, {int32_type->decompose(5)}, {long_type->decompose(20L)} }}); + }); + + // Delete row TS=10 + e.execute_cql("delete from cf using timestamp 10 where p = 1 and c = 2").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, v2, WRITETIME(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ { }, {int32_type->decompose(5)}, {long_type->decompose(20L)} }}); + }); + e.execute_cql("drop materialized view vcf").get(); e.execute_cql("drop table cf").get(); } From cc6c96bc9245db5784c2d78f7aa5292901291929 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:03:50 +0100 Subject: [PATCH 13/14] tests: Add view_complex_test This patch introduces view_complex_test and adds more test coverage for materialized views. A new file was introduced to avoid making view_schema_test slower. Signed-off-by: Duarte Nunes --- configure.py | 1 + test.py | 1 + tests/view_complex_test.cc | 1193 ++++++++++++++++++++++++++++++++++++ 3 files changed, 1195 insertions(+) create mode 100644 tests/view_complex_test.cc diff --git a/configure.py b/configure.py index bfcd591882..593fbb5fad 100755 --- a/configure.py +++ b/configure.py @@ -274,6 +274,7 @@ scylla_tests = [ 'tests/virtual_reader_test', 'tests/view_schema_test', 'tests/view_build_test', + 'tests/view_complex_test', 'tests/counter_test', 'tests/cell_locker_test', 'tests/row_locker_test', diff --git a/test.py b/test.py index c6f86e9ccc..0a9564a141 100755 --- a/test.py +++ b/test.py @@ -90,6 +90,7 @@ boost_tests = [ 'cell_locker_test', 'view_schema_test', 'view_build_test', + 'view_complex_test', 'clustering_ranges_walker_test', 'vint_serialization_test', 'duration_test', diff --git a/tests/view_complex_test.cc b/tests/view_complex_test.cc new file mode 100644 index 0000000000..55edc87cac --- /dev/null +++ b/tests/view_complex_test.cc @@ -0,0 +1,1193 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include + +#include "database.hh" +#include "db/view/view_builder.hh" +#include "sstables/compaction_manager.hh" + +#include "tests/test-utils.hh" +#include "tests/cql_test_env.hh" +#include "tests/cql_assertions.hh" + +#include "db/config.hh" + +using namespace std::literals::chrono_literals; + +// Requires on #3362 +#if 0 +void test_partial_delete_unselected_column(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, a int, b int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c from cf " + "where p is not null and c is not null " + "primary key (p, c)").get(); + + e.execute_cql("update cf using timestamp 10 set b = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(1)}, {int32_type->decompose(1)} }}); + }); + + e.execute_cql("delete b from cf using timestamp 11 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 1 set a = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(1)}, {int32_type->decompose(1)} }}); + }); + + e.execute_cql("update cf using timestamp 18 set a = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(1)}, {int32_type->decompose(1)} }}); + }); + + e.execute_cql("update cf using timestamp 20 set a = null where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1) using timestamp 15").get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); +} + +SEASTAR_TEST_CASE(test_partial_delete_unselected_column_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_partial_delete_unselected_column(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_partial_delete_unselected_column_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_partial_delete_unselected_column(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} +#endif + +void test_partial_delete_selected_column(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, a int, b int, e int, f int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select a, b from cf " + "where p is not null and c is not null " + "primary key (p, c)").get(); + + e.execute_cql("update cf using timestamp 10 set b = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("delete b from cf using timestamp 11 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 1 set a = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); + + e.execute_cql("delete a from cf using timestamp 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1) using timestamp 0").get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 12 set b = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("delete b from cf using timestamp 13 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("delete from cf using timestamp 14 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1) using timestamp 15").get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using ttl 3 set b = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); + + forward_jump_clocks(4s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("delete from cf using timestamp 15 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + // removal generated by unselected column should not shadow selected column with smaller timestamp + e.execute_cql("update cf using timestamp 18 set e = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 18 set e = null where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 16 set a = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); +} + +SEASTAR_TEST_CASE(test_partial_delete_selected_column_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_partial_delete_selected_column(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_partial_delete_selected_column_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_partial_delete_selected_column(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_update_column_in_view_pk_with_ttl(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int primary key, a int, b int)").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and a is not null " + "primary key (a, p)").get(); + + e.execute_cql("update cf set a = 1 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + }}); + }); + + e.execute_cql("delete a from cf where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p) values (1)").get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using ttl 5 set a = 10 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(10)}, + {int32_type->decompose(1)}, + { }, + }}); + }); + + e.execute_cql("update cf set b = 100 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(10)}, + {int32_type->decompose(1)}, + {int32_type->decompose(100)} + }}); + }); + + forward_jump_clocks(6s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); +} + +SEASTAR_TEST_CASE(test_update_column_in_view_pk_with_ttl_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_update_column_in_view_pk_with_ttl(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_update_column_in_view_pk_with_ttl_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_update_column_in_view_pk_with_ttl(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +SEASTAR_TEST_CASE(test_unselected_column_can_preserve_ttld_row_maker) { + return do_with_cql_env_thread([] (auto& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("insert into cf (p, c) values (0, 0) using ttl 60").get(); + e.execute_cql("update cf using ttl 0 set v = 0 where p = 0 and c = 0").get(); + forward_jump_clocks(65s); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ {int32_type->decompose(0)}, {int32_type->decompose(0)}, }}); + }); + }); +} + +void test_update_column_not_in_view(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("update cf using timestamp 0 set v1 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + e.execute_cql("delete v1 from cf using timestamp 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 1 set v1 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 2 set v2 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + e.execute_cql("delete v1 from cf using timestamp 3 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + e.execute_cql("delete v2 from cf using timestamp 4 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using ttl 3 set v2 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + forward_jump_clocks(3s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf set v2 = 1 where p = 0 and c = 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(0)}, + {int32_type->decompose(0)} + }}); + }); + + assert_that_failed(e.execute_cql("alter table cf drop v2;")); +} + +SEASTAR_TEST_CASE(test_update_column_not_in_view_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_update_column_not_in_view(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_update_column_not_in_view_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_update_column_not_in_view(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_partial_update_with_unselected_collections(cql_test_env& e, std::function&& maybe_flush) { +e.execute_cql("create table cf (p int, c int, a int, b int, l list, s set, m map, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select a, b from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("update cf set l=l+[1,2,3] where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf set l=l-[1,2] where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf set b = 3 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(3)} + }}); + }); + + e.execute_cql("update cf set b=null, l=l-[3], s=s-{3} where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf set m=m+{3:3}, l=l-[1], s=s-{2} where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + assert_that_failed(e.execute_cql("alter table cf drop m;")); +} + +SEASTAR_TEST_CASE(test_partial_update_with_unselected_collections_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_partial_update_with_unselected_collections(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_partial_update_with_unselected_collections_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_partial_update_with_unselected_collections(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_unselected_columns_ttl(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("insert into cf (p, c) values (1, 1) using ttl 3").get(); + e.execute_cql("update cf using ttl 1000 set v = 0 where p = 1 and c = 1").get(); + maybe_flush(); + + forward_jump_clocks(4s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("delete v from cf where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1)").get(); + e.execute_cql("update cf using ttl 3 set v = 0 where p = 1 and c = 1").get(); + e.execute_cql("insert into cf (p, c) values (3, 3) using ttl 3").get(); + + forward_jump_clocks(4s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 1 and c = 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + msg = e.execute_cql("select * from vcf where p = 3 and c = 3").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf set v = 0 where p = 3 and c = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf where p = 3 and c = 3").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(3)}, + {int32_type->decompose(3)} + }}); + }); +} + +SEASTAR_TEST_CASE(test_unselected_columns_ttl_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_unselected_columns_ttl(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_unselected_columns_ttl_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_unselected_columns_ttl(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_partition_deletion(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, a int, b int, c int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and a is not null " + "primary key (p, a)").get(); + + e.execute_cql("insert into cf (p, a, b, c) values (1, 1, 1, 1) using timestamp 0").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("update cf using timestamp 1 set a = null where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("delete from cf using timestamp 2 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 3 set a = 1, b = 1 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); +} + +SEASTAR_TEST_CASE(test_partition_deletion_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_partition_deletion(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_partition_deletion_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_partition_deletion(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_commutative_row_deletion(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, v1 int, v2 int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and v1 is not null " + "primary key (v1, p)").get(); + + e.execute_cql("insert into cf (p, v1, v2) values (3, 1, 3) using timestamp 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(3)}, + {long_type->decompose(1L)} + }}); + }); + + e.execute_cql("delete from cf using timestamp 2 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, v1) values (3, 1) using timestamp 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 4 set v1 = 2 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(2)}, + {int32_type->decompose(3)}, + { }, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 5 set v1 = 1 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + { }, + { } + }}); + }); + + e.local_db().get_compaction_manager().submit_major_compaction(&e.local_db().find_column_family("ks", "vcf")).get(); +} + +SEASTAR_TEST_CASE(test_commutative_row_deletion_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_commutative_row_deletion(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_commutative_row_deletion_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_commutative_row_deletion(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +SEASTAR_TEST_CASE(test_unselected_column_with_expired_marker) { + return do_with_cql_env_thread([] (auto& e) { + e.execute_cql("create table cf (p int, c int, a int, b int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c, b from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("update cf set a = 1 where p = 1 and c = 1").get(); + e.local_db().flush_all_memtables().get(); + e.execute_cql("insert into cf (p, c) values (1, 1) using ttl 5").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); + + forward_jump_clocks(6s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); + + e.execute_cql("update cf set a = null where p = 1 and c = 1").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 1 set b = 1 where p = 1 and c = 1").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + }); +} + +void test_update_with_column_timestamp_smaller_than_pk(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, v1 int, v2 int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and v1 is not null " + "primary key (v1, p)").get(); + + e.execute_cql("insert into cf (p, v1, v2) values (3, 1, 3) using timestamp 6").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + {int32_type->decompose(3)}, + {long_type->decompose(6L)} + }}); + }); + + e.execute_cql("insert into cf (p) values (3) using timestamp 20").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + {int32_type->decompose(3)}, + {long_type->decompose(6L)} + }}); + }); + + e.execute_cql("update cf using timestamp 7 set v1 = 2 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(2)}, + {int32_type->decompose(3)}, + {int32_type->decompose(3)}, + {long_type->decompose(6L)} + }}); + }); + + e.execute_cql("update cf using timestamp 8 set v1 = 1 where p = 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select v1, p, v2, writetime(v2) from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(3)}, + {int32_type->decompose(3)}, + {long_type->decompose(6L)} + }}); + }); +} + +SEASTAR_TEST_CASE(test_update_with_column_timestamp_smaller_than_pk_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_update_with_column_timestamp_smaller_than_pk(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_update_with_column_timestamp_smaller_than_pk_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_update_with_column_timestamp_smaller_than_pk(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_expired_marker_with_limit(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, a int, b int, primary key (p))").get(); + e.execute_cql("create materialized view vcf1 as select * from cf " + "where p is not null and a is not null " + "primary key (p, a)").get(); + e.execute_cql("create materialized view vcf2 as select * from cf " + "where p is not null and a is not null " + "primary key (a, p)").get(); + + for (int i = 1; i <= 100; i++) { + e.execute_cql(sprint("insert into cf (p, a, b) values (%d, %d, %d)", i, i, i)).get(); + } + for (int i = 1; i <= 100; i++) { + if (i % 50 != 0) { + e.execute_cql(sprint("delete a from cf where p = %d", i)).get(); + } + } + + maybe_flush(); + + for (auto view : {"vcf1", "vcf2"}) { + eventually([&] { + auto msg = e.execute_cql(sprint("select * from %s limit 1", view)).get0(); + assert_that(msg).is_rows().with_size(1); + msg = e.execute_cql(sprint("select * from %s limit 2", view)).get0(); + assert_that(msg).is_rows().with_size(2); + msg = e.execute_cql(sprint("select * from %s", view)).get0(); + assert_that(msg).is_rows().with_rows({ + {{int32_type->decompose(50)}, {int32_type->decompose(50)}, {int32_type->decompose(50)}}, + {{int32_type->decompose(100)}, {int32_type->decompose(100)}, {int32_type->decompose(100)}}, + }); + + }); + } +} + +SEASTAR_TEST_CASE(test_expired_marker_with_limit_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_expired_marker_with_limit(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_expired_marker_with_limit_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_expired_marker_with_limit(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_update_with_column_timestamp_bigger_than_pk(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, a int, b int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and a is not null " + "primary key (p, a)").get(); + + e.execute_cql("delete from cf using timestamp 0 where p = 1").get(); + maybe_flush(); + + e.execute_cql("insert into cf (p, a, b) values (1, 1, 1) using timestamp 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("update cf using timestamp 10 set b = 2 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(2)} + }}); + }); + + e.execute_cql("update cf using timestamp 2 set a = 2 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(2)}, + {int32_type->decompose(2)} + }}); + }); + + e.local_db().get_compaction_manager().submit_major_compaction(&e.local_db().find_column_family("ks", "vcf")).get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf limit 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(2)}, + {int32_type->decompose(2)} + }}); + }); + + e.execute_cql("update cf using timestamp 11 set a = 1 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf limit 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(2)} + }}); + }); + + e.execute_cql("update cf using timestamp 12 set a = null where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf limit 1").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 13 set a = 1 where p = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf limit 1").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(2)} + }}); + }); +} + +SEASTAR_TEST_CASE(test_update_with_column_timestamp_bigger_than_pk_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_update_with_column_timestamp_bigger_than_pk(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_update_with_column_timestamp_bigger_than_pk_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_update_with_column_timestamp_bigger_than_pk(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +void test_no_regular_base_column_in_view_pk(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("update cf using timestamp 1 set v1 = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { } + }}); + }); + + e.execute_cql("update cf using timestamp 2 set v1 = null, v2 = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); + + e.execute_cql("update cf using timestamp 2 set v2 = null where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("insert into cf (p, c) values (1, 1) using timestamp 3").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + { } + }}); + }); + + e.execute_cql("delete from cf using timestamp 4 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using timestamp 5 set v2 = 1 where p = 1 and c = 1").get(); + maybe_flush(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + { }, + {int32_type->decompose(1)} + }}); + }); +} + +SEASTAR_TEST_CASE(test_no_regular_base_column_in_view_pk_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_no_regular_base_column_in_view_pk(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_no_regular_base_column_in_view_pk_with_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_no_regular_base_column_in_view_pk(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} + +SEASTAR_TEST_CASE(test_shadowing_row_marker) { + return do_with_cql_env_thread([] (auto& e) { + e.execute_cql("create table cf (p int, v1 int, v2 int, primary key (p))").get(); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and v1 is not null " + "primary key (v1, p)").get(); + + e.execute_cql("insert into cf (p, v1, v2) values (1, 1, 1)").get(); + + e.execute_cql("update cf set v1 = null where p = 1").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + + e.execute_cql("update cf using ttl 5 set v1 = 1 where p = 1").get(); + e.local_db().flush_all_memtables().get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_rows({{ + {int32_type->decompose(1)}, + {int32_type->decompose(1)}, + {int32_type->decompose(1)} + }}); + }); + + forward_jump_clocks(6s); + + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); + }); +} + +void test_marker_timestamp_is_not_shadowed_by_previous_update(cql_test_env& e, std::function&& maybe_flush) { + e.execute_cql("create table cf (p int, c int, v1 int, v2 int, primary key (p, c))").get(); + e.execute_cql("create materialized view vcf as select p, c, v1 from cf " + "where p is not null and c is not null " + "primary key (c, p)").get(); + + e.execute_cql("insert into cf (p, c, v1, v2) VALUES(1, 1, 1, 1) using ttl 5").get(); + maybe_flush(); + e.execute_cql("update cf using ttl 1000 set v2 = 1 where p = 1 and c = 1").get(); + maybe_flush(); + e.execute_cql("delete v2 from cf where p = 1 and c = 1").get(); + maybe_flush(); + forward_jump_clocks(6s); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().is_empty(); + }); +} + +SEASTAR_TEST_CASE(test_marker_timestamp_is_not_shadowed_by_previous_update_without_flush) { + return do_with_cql_env_thread([] (auto& e) { + test_marker_timestamp_is_not_shadowed_by_previous_update(e, [] { }); + }); +} + +SEASTAR_TEST_CASE(test_marker_timestamp_is_not_shadowed_by_previous_updatewith_flush) { + db::config cfg; + cfg.enable_cache(false); + return do_with_cql_env_thread([] (auto& e) { + test_marker_timestamp_is_not_shadowed_by_previous_update(e, [&] { + e.local_db().flush_all_memtables().get(); + }); + }, cfg); +} From c8baba4e3ab8e3a0b9701a3b15314739439cdfb8 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 16 Apr 2018 00:38:21 +0100 Subject: [PATCH 14/14] mutation_partition: Clarify comment about emptiness empty() doesn't distinguish between live and dead data, so clarify that in its comment. Signed-off-by: Duarte Nunes --- mutation_partition.cc | 2 +- mutation_partition.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mutation_partition.cc b/mutation_partition.cc index aad405bca5..0c5e65c5ef 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1320,7 +1320,7 @@ void mutation_partition::compact_for_compaction(const schema& s, do_compact(s, compaction_time, all_rows, false, query::max_rows, can_gc); } -// Returns true if there is no live data or tombstones. +// Returns true if the mutation_partition represents no writes. bool mutation_partition::empty() const { if (_tombstone.timestamp != api::missing_timestamp) { diff --git a/mutation_partition.hh b/mutation_partition.hh index 506f710245..d8a6798923 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -1046,7 +1046,7 @@ public: // Range tombstones will be trimmed to the boundaries of the clustering ranges. mutation_partition sliced(const schema& s, const query::clustering_row_ranges&) const; - // Returns true if there is no live data or tombstones. + // Returns true if the mutation_partition represents no writes. bool empty() const; public: deletable_row& clustered_row(const schema& s, const clustering_key& key);