From ec8960df45fa7e64cb7548ffffb9e99fec06700c Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Tue, 3 Apr 2018 13:22:43 +0100 Subject: [PATCH] db/view: Reject view entries with non-composite, empty partition key Empty partition keys are not supported on normal tables - they cannot be inserted or queried (surprisingly, the rules for composite partition keys are different: all components are then allowed to be empty). However, the (non-composite) partition key of a view could end up being empty if that column is: a base table regular column, a base table clustering key column, or a base table partition key column, part of a composite key. Fixes #3262 Refs CASSANDRA-14345 Signed-off-by: Duarte Nunes Message-Id: <20180403122244.10626-1-duarte@scylladb.com> --- db/view/view.cc | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index a899aaeefa..f5ec146b43 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -189,6 +189,31 @@ static bool update_requires_read_before_write(const schema& base, return false; } +static bool is_partition_key_empty( + const schema& base, + const schema& view_schema, + const partition_key& base_key, + const clustering_row& update) { + // Empty partition keys are not supported on normal tables - they cannot + // be inserted or queried, so enforce those rules here. + if (view_schema.partition_key_columns().size() > 1) { + // Composite partition keys are different: all components + // are then allowed to be empty. + return false; + } + auto* base_col = base.get_column_definition(view_schema.partition_key_columns().front().name()); + switch (base_col->kind) { + case column_kind::partition_key: + return base_key.get_component(base, base_col->position()).empty(); + case column_kind::clustering_key: + return update.key().get_component(base, base_col->position()).empty(); + default: + // No multi-cell columns in the view's partition key + auto& c = update.cells().cell_at(base_col->id); + return c.as_atomic_cell().value().empty(); + } +} + bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now) { return clustering_prefix_matches(base, view, key, update.key()) && boost::algorithm::all_of( @@ -344,7 +369,7 @@ static void add_cells_to_view(const schema& base, const schema& view, const row& * This method checks that the base row does match the view filter before applying anything. */ void view_updates::create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now) { - if (!matches_view_filter(*_base, _view_info, base_key, update, now)) { + if (is_partition_key_empty(*_base, *_view, base_key, update) || !matches_view_filter(*_base, _view_info, base_key, update, now)) { return; } deletable_row& r = get_view_row(base_key, update); @@ -360,7 +385,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_ void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) { // Before deleting an old entry, make sure it was matching the view filter // (otherwise there is nothing to delete) - if (matches_view_filter(*_base, _view_info, base_key, existing, now)) { + if (!is_partition_key_empty(*_base, *_view, base_key, existing) && matches_view_filter(*_base, _view_info, base_key, existing, now)) { do_delete_old_entry(base_key, existing, t, now); } } @@ -405,11 +430,11 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus void view_updates::update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) { // While we know update and existing correspond to the same view entry, // they may not match the view filter. - if (!matches_view_filter(*_base, _view_info, base_key, existing, now)) { + if (is_partition_key_empty(*_base, *_view, base_key, existing) || !matches_view_filter(*_base, _view_info, base_key, existing, now)) { create_entry(base_key, update, now); return; } - if (!matches_view_filter(*_base, _view_info, base_key, update, now)) { + if (is_partition_key_empty(*_base, *_view, base_key, update) || !matches_view_filter(*_base, _view_info, base_key, update, now)) { do_delete_old_entry(base_key, existing, row_tombstone(), now); return; }