Files
scylladb/cql3/statements/create_view_statement.cc
Michael Litvak 868ac42a8b tombstone_gc: don't use 'repair' mode for colocated tables
For tables of special types that can be located: MV, CDC, and paxos
table, we should not use tombstone_gc=repair mode because colocated
tablets are never repaired, hence they will not have repair_time set and
will never be GC'd using 'repair' mode.
2025-11-25 09:15:46 +01:00

456 lines
22 KiB
C++

/*
* Copyright (C) 2016-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "exceptions/exceptions.hh"
#include "utils/assert.hh"
#include <unordered_set>
#include <vector>
#include <boost/regex.hpp>
#include <seastar/core/coroutine.hh>
#include "cql3/column_identifier.hh"
#include "cql3/restrictions/statement_restrictions.hh"
#include "cql3/statements/create_view_statement.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/statements/select_statement.hh"
#include "cql3/statements/raw/select_statement.hh"
#include "cql3/query_processor.hh"
#include "cql3/util.hh"
#include "schema/schema_builder.hh"
#include "service/storage_proxy.hh"
#include "validation.hh"
#include "data_dictionary/data_dictionary.hh"
#include "gms/feature_service.hh"
#include "db/view/view.hh"
#include "service/migration_manager.hh"
#include "replica/database.hh"
#include "db/config.hh"
namespace cql3 {
namespace statements {
create_view_statement::create_view_statement(
cf_name view_name,
cf_name base_name,
std::vector<::shared_ptr<selection::raw_selector>> select_clause,
expr::expression where_clause,
std::vector<::shared_ptr<cql3::column_identifier::raw>> partition_keys,
std::vector<::shared_ptr<cql3::column_identifier::raw>> clustering_keys,
bool if_not_exists)
: schema_altering_statement{view_name}
, _base_name{base_name}
, _select_clause{select_clause}
, _where_clause{where_clause}
, _partition_keys{partition_keys}
, _clustering_keys{clustering_keys}
, _if_not_exists{if_not_exists}
{
}
future<> create_view_statement::check_access(query_processor& qp, const service::client_state& state) const {
return state.has_column_family_access(keyspace(), _base_name.get_column_family(), auth::permission::ALTER);
}
static const column_definition* get_column_definition(const schema& schema, column_identifier::raw& identifier) {
auto prepared = identifier.prepare(schema);
SCYLLA_ASSERT(dynamic_pointer_cast<column_identifier>(prepared));
auto id = static_pointer_cast<column_identifier>(prepared);
return schema.get_column_definition(id->name());
}
static bool validate_primary_key(
const schema& schema,
const column_definition* def,
const std::unordered_set<const column_definition*>& base_pk,
bool has_non_pk_column,
const restrictions::statement_restrictions& restrictions) {
if (def->type->is_multi_cell()) {
throw exceptions::invalid_request_exception(format("Cannot use MultiCell column '{}' in PRIMARY KEY of materialized view", def->name_as_text()));
}
if (def->type->references_duration()) {
throw exceptions::invalid_request_exception(format("Cannot use Duration column '{}' in PRIMARY KEY of materialized view", def->name_as_text()));
}
if (def->is_static()) {
throw exceptions::invalid_request_exception(format("Cannot use Static column '{}' in PRIMARY KEY of materialized view", def->name_as_text()));
}
bool new_non_pk_column = false;
if (!base_pk.contains(def)) {
if (has_non_pk_column) {
throw exceptions::invalid_request_exception(format("Cannot include more than one non-primary key column '{}' in materialized view primary key", def->name_as_text()));
}
new_non_pk_column = true;
}
// We don't need to include the "IS NOT NULL" filter on a non-composite partition key
// because we will never allow a single partition key to be NULL
bool is_non_composite_partition_key = def->is_partition_key() &&
schema.partition_key_columns().size() == 1;
if (!is_non_composite_partition_key && !restrictions.is_restricted(def)) {
throw exceptions::invalid_request_exception(format("Primary key column '{}' is required to be filtered by 'IS NOT NULL'", def->name_as_text()));
}
return new_non_pk_column;
}
std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(data_dictionary::database db) const {
// We need to make sure that:
// - materialized view name is valid
// - primary key includes all columns in base table's primary key
// - make sure that the select statement does not have anything other than columns
// and their names match the base table's names
// - make sure that primary key does not include any collections
// - make sure there is no where clause in the select statement
// - make sure there is not currently a table or view
// - make sure base_table gc_grace_seconds > 0
cql3::cql_warnings_vec warnings;
auto schema_extensions = _properties.properties()->make_schema_extensions(db.extensions());
_properties.validate(db, keyspace(), schema_extensions);
if (_properties.use_compact_storage()) {
throw exceptions::invalid_request_exception(format("Cannot use 'COMPACT STORAGE' when defining a materialized view"));
}
if (_properties.properties()->get_cdc_options(schema_extensions)) {
throw exceptions::invalid_request_exception("Cannot enable CDC for a materialized view");
}
// View and base tables must be in the same keyspace, to ensure that RF
// is the same (because we assign a view replica to each base replica).
// If a keyspace was not specified for the base table name, it is assumed
// it is in the same keyspace as the view table being created (which
// itself might be the current USEd keyspace, or explicitly specified).
if (!_base_name.has_keyspace()) {
_base_name.set_keyspace(keyspace(), true);
}
if (_base_name.get_keyspace() != keyspace()) {
throw exceptions::invalid_request_exception(format("Cannot create a materialized view on a table in a separate keyspace ('{}' != '{}')",
_base_name.get_keyspace(), keyspace()));
}
const sstring& cf_name = column_family();
boost::regex name_regex("\\w+");
if (!boost::regex_match(std::string(cf_name), name_regex)) {
throw exceptions::invalid_request_exception(format("\"{}\" is not a valid materialized view name (must contain alphanumeric character only: [0-9A-Za-z] or the underscore)", cf_name.c_str()));
}
if (cf_name.size() > size_t(schema::NAME_LENGTH)) {
throw exceptions::invalid_request_exception(format("materialized view names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, cf_name.c_str()));
}
schema_ptr schema = validation::validate_column_family(db, _base_name.get_keyspace(), _base_name.get_column_family());
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
if (schema->is_counter()) {
throw exceptions::invalid_request_exception(format("Materialized views are not supported on counter tables"));
}
if (schema->is_view()) {
throw exceptions::invalid_request_exception(format("Materialized views cannot be created against other materialized views"));
}
if (db.get_cdc_base_table(*schema)) {
throw exceptions::invalid_request_exception(format("Materialized views cannot be created on CDC Log tables"));
}
if (schema->gc_grace_seconds().count() == 0) {
throw exceptions::invalid_request_exception(fmt::format(
"Cannot create materialized view '{}' for base table "
"'{}' with gc_grace_seconds of 0, since this value is "
"used to TTL undelivered updates. Setting gc_grace_seconds "
"too low might cause undelivered updates to expire "
"before being replayed.", column_family(), _base_name.get_column_family()));
}
// Gather all included columns, as specified by the select clause
auto included = _select_clause | std::views::transform([&](auto&& selector) {
if (selector->alias) {
throw exceptions::invalid_request_exception(format("Cannot use alias when defining a materialized view"));
}
auto& selectable = selector->selectable_;
shared_ptr<column_identifier::raw> identifier;
expr::visit(overloaded_functor{
[&] (const expr::unresolved_identifier& ui) { identifier = ui.ident; },
[] (const auto& default_case) -> void { throw exceptions::invalid_request_exception(format("Cannot use general expressions when defining a materialized view")); },
}, selectable);
auto* def = get_column_definition(*schema, *identifier);
if (!def) {
throw exceptions::invalid_request_exception(format("Unknown column name detected in CREATE MATERIALIZED VIEW statement: {}", identifier));
}
return def;
}) | std::ranges::to<std::unordered_set<const column_definition*>>();
auto parameters = make_lw_shared<raw::select_statement::parameters>(raw::select_statement::parameters::orderings_type(), false, true);
raw::select_statement raw_select(_base_name, std::move(parameters), _select_clause, _where_clause, std::nullopt, std::nullopt, {}, std::make_unique<cql3::attributes::raw>());
raw_select.prepare_keyspace(keyspace());
raw_select.set_bound_variables({});
cql_stats ignored;
auto prepared = raw_select.prepare(db, ignored, true);
auto restrictions = static_pointer_cast<statements::select_statement>(prepared->statement)->get_restrictions();
auto base_primary_key_cols =
schema->primary_key_columns()
| std::views::transform([](auto&& def) { return &def; })
| std::ranges::to<std::unordered_set<const column_definition*>>();
// Validate the primary key clause, ensuring only one non-PK base column is used in the view's PK.
bool has_non_pk_column = false;
std::unordered_set<const column_definition*> target_primary_keys;
std::vector<const column_definition*> target_partition_keys;
std::vector<const column_definition*> target_clustering_keys;
auto validate_pk = [&] (const std::vector<::shared_ptr<cql3::column_identifier::raw>>& keys, std::vector<const column_definition*>& target_keys) mutable {
for (auto&& identifier : keys) {
auto* def = get_column_definition(*schema, *identifier);
if (!def) {
throw exceptions::invalid_request_exception(format("Unknown column name detected in CREATE MATERIALIZED VIEW statement: {}", identifier));
}
if (!target_primary_keys.insert(def).second) {
throw exceptions::invalid_request_exception(format("Duplicate entry found in PRIMARY KEY: {}", identifier));
}
target_keys.push_back(def);
has_non_pk_column |= validate_primary_key(*schema, def, base_primary_key_cols, has_non_pk_column, *restrictions);
}
};
validate_pk(_partition_keys, target_partition_keys);
validate_pk(_clustering_keys, target_clustering_keys);
std::vector<const column_definition*> missing_pk_columns;
std::vector<const column_definition*> target_non_pk_columns;
std::vector<const column_definition*> unselected_columns;
// We need to include all of the primary key columns from the base table in order to make sure that we do not
// overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in
// the view because if we need to generate a tombstone, we have no way of knowing which value is currently being
// used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require
// that they include all of the columns. We provide them with a list of all of the columns left to include.
for (auto& def : schema->all_columns()) {
bool included_def = included.empty() || included.contains(&def);
if (included_def && def.is_static()) {
throw exceptions::invalid_request_exception(format("Unable to include static column '{}' which would be included by Materialized View SELECT * statement", def.name_as_text()));
}
bool def_in_target_pk = target_primary_keys.contains(&def);
if (included_def && !def_in_target_pk) {
target_non_pk_columns.push_back(&def);
}
if (!included_def && !def_in_target_pk && !def.is_static()) {
unselected_columns.push_back(&def);
}
if (def.is_primary_key() && !def_in_target_pk) {
missing_pk_columns.push_back(&def);
}
}
if (!missing_pk_columns.empty()) {
throw exceptions::invalid_request_exception(seastar::format(
"Cannot create Materialized View {} without primary key columns from base {} ({})",
column_family(), _base_name.get_column_family(),
fmt::join(missing_pk_columns | std::views::transform(std::mem_fn(&column_definition::name_as_text)), ", ")));
}
if (_partition_keys.empty()) {
throw exceptions::invalid_request_exception(format("Must select at least a column for a Materialized View"));
}
// Cassandra requires that if CLUSTERING ORDER BY is used, it must specify
// all clustering columns. Scylla relaxes this requirement and allows just
// a subset of the clustering columns. But it doesn't make sense (and it's
// forbidden) to list something which is not a clustering key column in
// the CLUSTERING ORDER BY. Let's verify that:
for (auto& pair: _properties.defined_ordering()) {
auto&& name = pair.first->text();
bool not_clustering = true;
for (auto& c : _clustering_keys) {
if (name == c->to_string()) {
not_clustering = false;
break;
}
}
if (not_clustering) {
throw exceptions::invalid_request_exception(format("CLUSTERING ORDER BY lists {} which is not a clustering column in the view", name));
}
}
// The unique feature of a filter by a non-key column is that the
// value of such column can be updated - and also be expired with TTL
// and cause the view row to appear and disappear. We don't currently
// support support this case - see issue #3430, and neither does
// Cassandra - see see CASSANDRA-13798 and CASSANDRA-13832.
// Actually, as CASSANDRA-13798 explains, the problem is "the liveness of
// view row is now depending on multiple base columns (multiple filtered
// non-pk base column + base column used in view pk)". When the filtered
// column *is* the base column added to the view pk, we don't have this
// problem. And this case actually works correctly.
const expr::single_column_restrictions_map& non_pk_restrictions = restrictions->get_non_pk_restriction();
if (non_pk_restrictions.size() == 1 && has_non_pk_column &&
target_primary_keys.contains(non_pk_restrictions.cbegin()->first)) {
// This case (filter by new PK column of the view) works, as explained above
} else if (!non_pk_restrictions.empty()) {
throw exceptions::invalid_request_exception(seastar::format("Non-primary key columns cannot be restricted in the SELECT statement used for materialized view {} creation (got restrictions on: {})",
column_family(),
fmt::join(non_pk_restrictions | std::views::keys | std::views::transform(std::mem_fn(&column_definition::name_as_text)), ", ")));
}
// IS NOT NULL restrictions are handled separately from other restrictions.
// They need a separate check as they won't be included in non_pk_restrictions.
std::vector<std::string_view> invalid_not_null_column_names;
for (const column_definition* not_null_cdef : restrictions->get_not_null_columns()) {
if (!target_primary_keys.contains(not_null_cdef)) {
invalid_not_null_column_names.push_back(not_null_cdef->name_as_text());
}
}
if (!invalid_not_null_column_names.empty() &&
db.get_config().strict_is_not_null_in_views() == db::tri_mode_restriction_t::mode::TRUE) {
throw exceptions::invalid_request_exception(
fmt::format("The IS NOT NULL restriction is allowed only columns which are part of the view's primary key,"
" found columns: {}. The flag strict_is_not_null_in_views can be used to turn this error "
"into a warning, or to silence it. (true - error, warn - warning, false - silent)",
fmt::join(invalid_not_null_column_names, ", ")));
}
if (!invalid_not_null_column_names.empty() &&
db.get_config().strict_is_not_null_in_views() == db::tri_mode_restriction_t::mode::WARN) {
sstring warning_text = fmt::format(
"The IS NOT NULL restriction is allowed only columns which are part of the view's primary key,"
" found columns: {}. Restrictions on these columns will be silently ignored. "
"The flag strict_is_not_null_in_views can be used to turn this warning into an error, or to silence it. "
"(true - error, warn - warning, false - silent)",
fmt::join(invalid_not_null_column_names, ", "));
warnings.emplace_back(std::move(warning_text));
}
const auto maybe_id = _properties.properties()->get_id();
if (maybe_id && db.try_find_table(*maybe_id)) {
const auto schema_ptr = db.find_schema(*maybe_id);
const auto& ks_name = schema_ptr->ks_name();
const auto& cf_name = schema_ptr->cf_name();
throw exceptions::invalid_request_exception(seastar::format("Table with ID {} already exists: {}.{}", *maybe_id, ks_name, cf_name));
}
schema_builder builder{keyspace(), column_family(), maybe_id};
auto add_columns = [this, &builder] (std::vector<const column_definition*>& defs, column_kind kind) mutable {
for (auto* def : defs) {
auto&& type = _properties.get_reversable_type(*def->column_specification->name, def->type);
builder.with_column(def->name(), type, kind);
}
};
add_columns(target_partition_keys, column_kind::partition_key);
add_columns(target_clustering_keys, column_kind::clustering_key);
add_columns(target_non_pk_columns, column_kind::regular_column);
// Add all unselected columns (base-table columns which are not selected
// in the view) as "virtual columns" - columns which have timestamp and
// ttl information, but an empty value. These are needed to keep view
// rows alive when the base row is alive, even if the view row has no
// data, just a key (see issue #3362). The virtual columns are not needed
// when the view pk adds a regular base column (i.e., has_non_pk_column)
// because in that case, the liveness of that base column is what
// determines the liveness of the view row.
if (!has_non_pk_column) {
for (auto* def : unselected_columns) {
db::view::create_virtual_column(builder, def->name(), def->type);
}
}
bool is_colocated = [&] {
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
return false;
}
if (target_partition_keys.size() != schema->partition_key_columns().size()) {
return false;
}
for (size_t i = 0; i < target_partition_keys.size(); ++i) {
if (target_partition_keys[i] != &schema->partition_key_columns()[i]) {
return false;
}
}
return true;
}();
if (is_colocated) {
auto gc_opts = _properties.properties()->get_tombstone_gc_options(schema_extensions);
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables.");
}
}
_properties.properties()->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
if (builder.default_time_to_live().count() > 0) {
throw exceptions::invalid_request_exception(
"Cannot set or alter default_time_to_live for a materialized view. "
"Data in a materialized view always expire at the same time than "
"the corresponding data in the parent table.");
}
auto where_clause_text = util::relations_to_where_clause(_where_clause);
builder.with_view_info(schema, included.empty(), std::move(where_clause_text));
return std::make_pair(view_ptr(builder.build()), std::move(warnings));
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chunked_vector<mutation>, cql3::cql_warnings_vec>>
create_view_statement::prepare_schema_mutations(query_processor& qp, const query_options&, api::timestamp_type ts) const {
utils::chunked_vector<mutation> m;
auto [definition, warnings] = prepare_view(qp.db());
try {
m = co_await service::prepare_new_view_announcement(qp.proxy(), std::move(definition), ts);
} catch (const exceptions::already_exists_exception& e) {
if (!_if_not_exists) {
co_return coroutine::exception(std::current_exception());
}
}
// If an IF NOT EXISTS clause was used and resource was already created
// we shouldn't emit created event. However it interacts badly with
// concurrent clients creating resources. The client seeing no create event
// assumes resource already previously existed and proceeds with its logic
// which may depend on that resource. But it may send requests to nodes which
// are not yet aware of new schema or client's metadata may be outdated.
// To force synchronization always emit the event (see
// github.com/scylladb/scylladb/issues/16909).
co_return std::make_tuple(created_event(), std::move(m), std::move(warnings));
}
std::unique_ptr<cql3::statements::prepared_statement>
create_view_statement::prepare(data_dictionary::database db, cql_stats& stats) {
if (!_prepare_ctx.get_variable_specifications().empty()) {
throw exceptions::invalid_request_exception(format("Cannot use query parameters in CREATE MATERIALIZED VIEW statements"));
}
return std::make_unique<prepared_statement>(audit_info(), make_shared<create_view_statement>(*this));
}
::shared_ptr<schema_altering_statement::event_t> create_view_statement::created_event() const {
return make_shared<event_t>(
event_t::change_type::CREATED,
event_t::target_type::TABLE,
keyspace(),
column_family());
}
}
}