diff --git a/db/view/view.cc b/db/view/view.cc index bf46d306bb..746771136d 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include "db/view/base_info.hh" #include "db/view/view_build_status.hh" @@ -3486,6 +3487,7 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q auto view_exploded_ck = ck.explode(); std::vector base_exploded_pk(_base_schema->partition_key_size()); std::vector base_exploded_ck(_base_schema->clustering_key_size()); + std::flat_map view_key_cols_not_in_base_key; for (const column_definition& view_cdef : _view->all_columns()) { const column_definition* base_cdef = _base_schema->get_column_definition(view_cdef.name()); if (base_cdef) { @@ -3494,6 +3496,8 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q base_exploded_pk[base_cdef->id] = view_exploded_key[view_cdef.id]; } else if (base_cdef->is_clustering_key()) { base_exploded_ck[base_cdef->id] = view_exploded_key[view_cdef.id]; + } else if (!base_cdef->is_computed() && view_cdef.is_primary_key()) { + view_key_cols_not_in_base_key[base_cdef] = view_exploded_key[view_cdef.id]; } } } @@ -3501,22 +3505,43 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q clustering_key base_ck = clustering_key::from_exploded(base_exploded_ck); dht::partition_range_vector partition_ranges({dht::partition_range::make_singular(dht::decorate_key(*_base_schema, base_pk))}); - auto selection = cql3::selection::selection::for_columns(_base_schema, std::vector({&_base_schema->partition_key_columns().front()})); + auto selection = cql3::selection::selection::for_columns(_base_schema, + view_key_cols_not_in_base_key.empty() ? std::vector({&_base_schema->partition_key_columns().front()}) : view_key_cols_not_in_base_key.keys()); std::vector bounds{query::clustering_range::make_singular(base_ck)}; - query::partition_slice partition_slice(std::move(bounds), {}, {}, selection->get_query_options()); + utils::small_vector view_key_col_ids; + for (const column_definition* col_def : view_key_cols_not_in_base_key.keys()) { + view_key_col_ids.push_back(col_def->id); + } + query::partition_slice partition_slice(std::move(bounds), {}, std::move(view_key_col_ids), selection->get_query_options()); auto command = ::make_lw_shared(_base_schema->id(), _base_schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit())); auto timeout = db::timeout_clock::now() + _timeout_duration; service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()}; auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get(); query::result& result = *base_qr.query_result; - if (result.row_count().value_or(0) == 0) { + auto delete_ghost_row = [&]() { mutation m(_view, *_view_pk); auto& row = m.partition().clustered_row(*_view, ck); row.apply(tombstone(api::new_timestamp(), gc_clock::now())); timeout = db::timeout_clock::now() + _timeout_duration; _proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get(); + }; + if (result.row_count().value_or(0) == 0) { + delete_ghost_row(); + } else if (!view_key_cols_not_in_base_key.empty()) { + if (result.row_count().value_or(0) != 1) { + on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name())); + } + auto results = query::result_set::from_raw_result(_base_schema, partition_slice, result); + auto& base_row = results.row(0); + for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) { + const data_value* base_val = base_row.get_data_value(col_def->name_as_text()); + if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) { + delete_ghost_row(); + break; + } + } } } diff --git a/test/boost/secondary_index_test.cc b/test/boost/secondary_index_test.cc index 22e4453078..ef469209de 100644 --- a/test/boost/secondary_index_test.cc +++ b/test/boost/secondary_index_test.cc @@ -1984,6 +1984,129 @@ SEASTAR_TEST_CASE(test_deleting_ghost_rows) { }); } +SEASTAR_TEST_CASE(test_deleting_ghost_rows_with_same_base_pk) { + return do_with_cql_env_thread([] (auto& e) { + cquery_nofail(e, "CREATE TABLE t (p int, c int, v int, PRIMARY KEY (p, c))"); + cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT v, p, c FROM t WHERE v IS NOT NULL AND c IS NOT NULL PRIMARY KEY (v, p, c);"); + for (int i = 0; i < 100; i++) { + cquery_nofail(e, format("INSERT INTO t (p,c,v) VALUES ({},{},{})", i, i * 100, i + 100)); + } + + std::vector> expected_view_rows; + for (int i = 0; i < 100; i++) { + expected_view_rows.push_back({int32_type->decompose(i + 100), int32_type->decompose(i), int32_type->decompose(i * 100)}); + } + auto inject_ghost_row = [&e] (int p, int c, int v) { + e.db().invoke_on_all([p, c, v] (replica::database& db) { + schema_ptr schema = db.find_schema("ks", "tv"); + replica::table& t = db.find_column_family(schema); + mutation m(schema, partition_key::from_singular(*schema, v)); + auto& row = m.partition().clustered_row(*schema, clustering_key::from_exploded(*schema, {int32_type->decompose(p), int32_type->decompose(c)})); + row.apply(row_marker{api::new_timestamp()}); + unsigned shard = t.shard_for_reads(m.token()); + if (shard == this_shard_id()) { + t.apply(m); + } + }).get(); + }; + + inject_ghost_row(1, 100, 1111); + eventually([&] { + // The ghost row exists, but it can only be queried from the view, not from the base + auto msg = cquery_nofail(e, "SELECT * FROM tv WHERE v = 1111;"); + assert_that(msg).is_rows().with_rows({ + {int32_type->decompose(1111), int32_type->decompose(1), int32_type->decompose(100)}, + }); + }); + + // Ghost row deletion is attempted for a single view partition + cquery_nofail(e, "PRUNE MATERIALIZED VIEW tv WHERE v = 1111"); + eventually([&] { + // The ghost row is deleted + auto msg = cquery_nofail(e, "SELECT * FROM tv where v = 1111;"); + assert_that(msg).is_rows().with_size(0); + }); + + for (int i = 0; i < 100; ++i) { + inject_ghost_row(i, i * 100, i * 4321); + } + eventually([&] { + auto msg = cquery_nofail(e, "SELECT * FROM tv;"); + assert_that(msg).is_rows().with_size(200); + }); + + // Ghost row deletion is attempted for the whole table + cquery_nofail(e, "PRUNE MATERIALIZED VIEW tv;"); + eventually([&] { + // Ghost rows are deleted + auto msg = cquery_nofail(e, "SELECT * FROM tv;"); + assert_that(msg).is_rows().with_rows_ignore_order(expected_view_rows); + }); + + for (int i = 0; i < 100; ++i) { + inject_ghost_row(i, i * 100, i * 2345); + } + eventually([&] { + auto msg = cquery_nofail(e, "SELECT * FROM tv;"); + assert_that(msg).is_rows().with_size(200); + }); + + // Ghost row deletion is attempted with a parallelized table scan + when_all( + e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) >= -9223372036854775807 AND token(v) <= 0"), + e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 0 AND token(v) <= 10000000"), + e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 10000000 AND token(v) <= 20000000"), + e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 20000000 AND token(v) <= 30000000"), + e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 30000000 AND token(v) <= 9223372036854775807") + ).get(); + eventually([&] { + // Ghost rows are deleted + auto msg = cquery_nofail(e, "SELECT * FROM tv;"); + assert_that(msg).is_rows().with_rows_ignore_order(expected_view_rows); + }); + }); +} + +SEASTAR_TEST_CASE(test_not_deleting_rows_with_different_regular_columns) { + return do_with_cql_env_thread([] (auto& e) { + cquery_nofail(e, "CREATE TABLE t (p int, c int, v int, PRIMARY KEY (p, c))"); + cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT c, p, v FROM t WHERE v IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p);"); + for (int i = 0; i < 100; i++) { + cquery_nofail(e, format("INSERT INTO t (p,c,v) VALUES ({},{},{})", i, i * 100, i + 100)); + } + + auto inject_different_but_not_ghost_row = [&e] (int p, int c, int v) { + e.db().invoke_on_all([p, c, v] (replica::database& db) { + schema_ptr schema = db.find_schema("ks", "tv"); + replica::table& t = db.find_column_family(schema); + mutation m(schema, partition_key::from_singular(*schema, c)); + auto timestamp = api::new_timestamp(); + auto ck = clustering_key::from_exploded(*schema, {int32_type->decompose(p)}); + auto& row = m.partition().clustered_row(*schema, ck); + m.set_cell(ck, to_bytes("v"), v, timestamp); + row.apply(row_marker{timestamp}); + unsigned shard = t.shard_for_reads(m.token()); + if (shard == this_shard_id()) { + t.apply(m); + } + }).get(); + }; + + for (int i = 0; i < 100; ++i) { + inject_different_but_not_ghost_row(i, i * 100, i + 101); + } + std::vector> expected_view_rows; + for (int i = 0; i < 100; i++) { + expected_view_rows.push_back({int32_type->decompose(i * 100), int32_type->decompose(i), int32_type->decompose(i + 101)}); + } + + cquery_nofail(e, "PRUNE MATERIALIZED VIEW tv"); + // Not a single row should get deleted + auto msg = cquery_nofail(e, "SELECT * FROM tv;"); + assert_that(msg).is_rows().with_rows_ignore_order(expected_view_rows); + }); +} + SEASTAR_TEST_CASE(test_returning_failure_from_ghost_rows_deletion) { return do_with_cql_env_thread([] (auto& e) { cquery_nofail(e, "CREATE TABLE t (p int, c int, v int, PRIMARY KEY (p, c))");