mv: delete previously undetected ghost rows in PRUNE MATERIALIZED VIEW statement

The PRUNE MATERIALIZED VIEW statement is supposed to remove ghost rows from the
view. Ghost rows are rows in the view with no corresponding row in the base table.
Before this patch, only rows whose primary key columns of the base table had
different values than any of the base rows were treated as ghost rows by the PRUNE
statement. However, view rows which have a column in their primary key that's not
in the base primary can also be ghost rows if this column has a different value
than the base row with the same values of remaining primary key columns. That's
because these rows won't be deleted unless we change value of this column in the
base table to this specific value.
In this patch we add a check for this column in the PRUNE MATERIALIZED VIEW logic.
If this column isn't the same in the base table and the view, these rows are also
deleted.

Fixes https://github.com/scylladb/scylladb/issues/25655

Closes scylladb/scylladb#25720
This commit is contained in:
Wojciech Mitros
2025-08-28 10:25:10 +02:00
committed by Piotr Dulikowski
parent 520cc0eeaa
commit 1f9be235b8
2 changed files with 151 additions and 3 deletions

View File

@@ -24,6 +24,7 @@
#include <seastar/core/future-util.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <flat_map>
#include "db/view/base_info.hh"
#include "db/view/view_build_status.hh"
@@ -3486,6 +3487,7 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
auto view_exploded_ck = ck.explode();
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
std::flat_map<const column_definition*, bytes> view_key_cols_not_in_base_key;
for (const column_definition& view_cdef : _view->all_columns()) {
const column_definition* base_cdef = _base_schema->get_column_definition(view_cdef.name());
if (base_cdef) {
@@ -3494,6 +3496,8 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
base_exploded_pk[base_cdef->id] = view_exploded_key[view_cdef.id];
} else if (base_cdef->is_clustering_key()) {
base_exploded_ck[base_cdef->id] = view_exploded_key[view_cdef.id];
} else if (!base_cdef->is_computed() && view_cdef.is_primary_key()) {
view_key_cols_not_in_base_key[base_cdef] = view_exploded_key[view_cdef.id];
}
}
}
@@ -3501,22 +3505,43 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
clustering_key base_ck = clustering_key::from_exploded(base_exploded_ck);
dht::partition_range_vector partition_ranges({dht::partition_range::make_singular(dht::decorate_key(*_base_schema, base_pk))});
auto selection = cql3::selection::selection::for_columns(_base_schema, std::vector<const column_definition*>({&_base_schema->partition_key_columns().front()}));
auto selection = cql3::selection::selection::for_columns(_base_schema,
view_key_cols_not_in_base_key.empty() ? std::vector<const column_definition*>({&_base_schema->partition_key_columns().front()}) : view_key_cols_not_in_base_key.keys());
std::vector<query::clustering_range> bounds{query::clustering_range::make_singular(base_ck)};
query::partition_slice partition_slice(std::move(bounds), {}, {}, selection->get_query_options());
utils::small_vector<column_id, 8> view_key_col_ids;
for (const column_definition* col_def : view_key_cols_not_in_base_key.keys()) {
view_key_col_ids.push_back(col_def->id);
}
query::partition_slice partition_slice(std::move(bounds), {}, std::move(view_key_col_ids), selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(_base_schema->id(), _base_schema->version(), partition_slice,
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
auto timeout = db::timeout_clock::now() + _timeout_duration;
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get();
query::result& result = *base_qr.query_result;
if (result.row_count().value_or(0) == 0) {
auto delete_ghost_row = [&]() {
mutation m(_view, *_view_pk);
auto& row = m.partition().clustered_row(*_view, ck);
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
timeout = db::timeout_clock::now() + _timeout_duration;
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
};
if (result.row_count().value_or(0) == 0) {
delete_ghost_row();
} else if (!view_key_cols_not_in_base_key.empty()) {
if (result.row_count().value_or(0) != 1) {
on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name()));
}
auto results = query::result_set::from_raw_result(_base_schema, partition_slice, result);
auto& base_row = results.row(0);
for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) {
const data_value* base_val = base_row.get_data_value(col_def->name_as_text());
if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) {
delete_ghost_row();
break;
}
}
}
}

View File

@@ -1984,6 +1984,129 @@ SEASTAR_TEST_CASE(test_deleting_ghost_rows) {
});
}
SEASTAR_TEST_CASE(test_deleting_ghost_rows_with_same_base_pk) {
return do_with_cql_env_thread([] (auto& e) {
cquery_nofail(e, "CREATE TABLE t (p int, c int, v int, PRIMARY KEY (p, c))");
cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT v, p, c FROM t WHERE v IS NOT NULL AND c IS NOT NULL PRIMARY KEY (v, p, c);");
for (int i = 0; i < 100; i++) {
cquery_nofail(e, format("INSERT INTO t (p,c,v) VALUES ({},{},{})", i, i * 100, i + 100));
}
std::vector<std::vector<bytes_opt>> expected_view_rows;
for (int i = 0; i < 100; i++) {
expected_view_rows.push_back({int32_type->decompose(i + 100), int32_type->decompose(i), int32_type->decompose(i * 100)});
}
auto inject_ghost_row = [&e] (int p, int c, int v) {
e.db().invoke_on_all([p, c, v] (replica::database& db) {
schema_ptr schema = db.find_schema("ks", "tv");
replica::table& t = db.find_column_family(schema);
mutation m(schema, partition_key::from_singular(*schema, v));
auto& row = m.partition().clustered_row(*schema, clustering_key::from_exploded(*schema, {int32_type->decompose(p), int32_type->decompose(c)}));
row.apply(row_marker{api::new_timestamp()});
unsigned shard = t.shard_for_reads(m.token());
if (shard == this_shard_id()) {
t.apply(m);
}
}).get();
};
inject_ghost_row(1, 100, 1111);
eventually([&] {
// The ghost row exists, but it can only be queried from the view, not from the base
auto msg = cquery_nofail(e, "SELECT * FROM tv WHERE v = 1111;");
assert_that(msg).is_rows().with_rows({
{int32_type->decompose(1111), int32_type->decompose(1), int32_type->decompose(100)},
});
});
// Ghost row deletion is attempted for a single view partition
cquery_nofail(e, "PRUNE MATERIALIZED VIEW tv WHERE v = 1111");
eventually([&] {
// The ghost row is deleted
auto msg = cquery_nofail(e, "SELECT * FROM tv where v = 1111;");
assert_that(msg).is_rows().with_size(0);
});
for (int i = 0; i < 100; ++i) {
inject_ghost_row(i, i * 100, i * 4321);
}
eventually([&] {
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_size(200);
});
// Ghost row deletion is attempted for the whole table
cquery_nofail(e, "PRUNE MATERIALIZED VIEW tv;");
eventually([&] {
// Ghost rows are deleted
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_rows_ignore_order(expected_view_rows);
});
for (int i = 0; i < 100; ++i) {
inject_ghost_row(i, i * 100, i * 2345);
}
eventually([&] {
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_size(200);
});
// Ghost row deletion is attempted with a parallelized table scan
when_all(
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) >= -9223372036854775807 AND token(v) <= 0"),
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 0 AND token(v) <= 10000000"),
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 10000000 AND token(v) <= 20000000"),
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 20000000 AND token(v) <= 30000000"),
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 30000000 AND token(v) <= 9223372036854775807")
).get();
eventually([&] {
// Ghost rows are deleted
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_rows_ignore_order(expected_view_rows);
});
});
}
SEASTAR_TEST_CASE(test_not_deleting_rows_with_different_regular_columns) {
return do_with_cql_env_thread([] (auto& e) {
cquery_nofail(e, "CREATE TABLE t (p int, c int, v int, PRIMARY KEY (p, c))");
cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT c, p, v FROM t WHERE v IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p);");
for (int i = 0; i < 100; i++) {
cquery_nofail(e, format("INSERT INTO t (p,c,v) VALUES ({},{},{})", i, i * 100, i + 100));
}
auto inject_different_but_not_ghost_row = [&e] (int p, int c, int v) {
e.db().invoke_on_all([p, c, v] (replica::database& db) {
schema_ptr schema = db.find_schema("ks", "tv");
replica::table& t = db.find_column_family(schema);
mutation m(schema, partition_key::from_singular(*schema, c));
auto timestamp = api::new_timestamp();
auto ck = clustering_key::from_exploded(*schema, {int32_type->decompose(p)});
auto& row = m.partition().clustered_row(*schema, ck);
m.set_cell(ck, to_bytes("v"), v, timestamp);
row.apply(row_marker{timestamp});
unsigned shard = t.shard_for_reads(m.token());
if (shard == this_shard_id()) {
t.apply(m);
}
}).get();
};
for (int i = 0; i < 100; ++i) {
inject_different_but_not_ghost_row(i, i * 100, i + 101);
}
std::vector<std::vector<bytes_opt>> expected_view_rows;
for (int i = 0; i < 100; i++) {
expected_view_rows.push_back({int32_type->decompose(i * 100), int32_type->decompose(i), int32_type->decompose(i + 101)});
}
cquery_nofail(e, "PRUNE MATERIALIZED VIEW tv");
// Not a single row should get deleted
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_rows_ignore_order(expected_view_rows);
});
}
SEASTAR_TEST_CASE(test_returning_failure_from_ghost_rows_deletion) {
return do_with_cql_env_thread([] (auto& e) {
cquery_nofail(e, "CREATE TABLE t (p int, c int, v int, PRIMARY KEY (p, c))");