Files
scylladb/db/schema_tables.hh
Michael Litvak ac96e40f13 schema: add pointer to CDC schema
Add to the schema object a member that points to the CDC schema object
that is compatible with this schema, if any.

The compatible CDC schema is created and altered with its base schema in
the same group0 operation.

When generating CDC log mutations for some base mutation we want them to
be created using a compatible schema thas has a CDC column corresponding
to each base column. This change will allow us to find the right CDC
schema given a base mutation.

We also update the relevant structures in the schema registry that are
related to learning about schemas and transporting schemas across
shards or nodes.

When transporting a schema as frozen_schema, we need to transport the
frozen cdc schema as well, and set it again when unfreezing and
reconstructing the schema.

When adding a schema to the registry, we need to ensure its CDC schema
is added to the registry as well.

Currently we always set the CDC schema to nullptr and maintain the
previous behavior. We will change it in a later commit. Until then, we
mark all places where CDC schema is passed clearly so we don't forget
it.
2025-10-21 14:13:43 +02:00

348 lines
15 KiB
C++

/*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "cql3/functions/functions.hh"
#include "mutation/mutation.hh"
#include "schema/schema_fwd.hh"
#include "schema_features.hh"
#include "utils/hashing.hh"
#include "schema/schema_mutations.hh"
#include "types/map.hh"
#include "query/query-result-set.hh"
#include "db/view/base_info.hh"
#include <seastar/core/sharded.hh>
#include <vector>
#include <map>
namespace data_dictionary {
class keyspace_metadata;
class user_types_storage;
}
using keyspace_metadata = data_dictionary::keyspace_metadata;
namespace replica {
class database;
}
namespace query {
class result_set;
}
namespace service {
class storage_service;
class storage_proxy;
}
namespace gms {
class feature_service;
}
namespace cql3::functions {
class user_function;
class user_aggregate;
}
namespace db {
class system_keyspace;
class extensions;
class config;
class schema_ctxt {
public:
schema_ctxt(const config&, std::shared_ptr<data_dictionary::user_types_storage> uts, const gms::feature_service&,
replica::database* = nullptr);
schema_ctxt(replica::database&);
schema_ctxt(sharded<replica::database>&);
schema_ctxt(sharded<service::storage_proxy>&);
const db::extensions& extensions() const {
return _extensions;
}
unsigned murmur3_partitioner_ignore_msb_bits() const {
return _murmur3_partitioner_ignore_msb_bits;
}
uint32_t schema_registry_grace_period() const {
return _schema_registry_grace_period;
}
const data_dictionary::user_types_storage& user_types() const noexcept {
return *_user_types;
}
const gms::feature_service& features() const {
return _features;
}
replica::database* get_db() const {
return _db;
}
private:
replica::database* _db;
const gms::feature_service& _features;
const db::extensions& _extensions;
const unsigned _murmur3_partitioner_ignore_msb_bits;
const uint32_t _schema_registry_grace_period;
const std::shared_ptr<data_dictionary::user_types_storage> _user_types;
};
namespace schema_tables {
extern logging::logger slogger;
using schema_result = std::map<sstring, lw_shared_ptr<query::result_set>>;
using schema_result_value_type = std::pair<sstring, lw_shared_ptr<query::result_set>>;
const std::string COMMITLOG_FILENAME_PREFIX("SchemaLog-");
namespace v3 {
static constexpr auto NAME = "system_schema";
static constexpr auto KEYSPACES = "keyspaces";
static constexpr auto SCYLLA_KEYSPACES = "scylla_keyspaces";
static constexpr auto TABLES = "tables";
static constexpr auto SCYLLA_TABLES = "scylla_tables";
static constexpr auto COLUMNS = "columns";
static constexpr auto DROPPED_COLUMNS = "dropped_columns";
static constexpr auto TRIGGERS = "triggers";
static constexpr auto VIEWS = "views";
static constexpr auto TYPES = "types";
static constexpr auto FUNCTIONS = "functions";
static constexpr auto AGGREGATES = "aggregates";
static constexpr auto SCYLLA_AGGREGATES = "scylla_aggregates";
static constexpr auto INDEXES = "indexes";
static constexpr auto VIEW_VIRTUAL_COLUMNS = "view_virtual_columns"; // Scylla specific
static constexpr auto COMPUTED_COLUMNS = "computed_columns"; // Scylla specific
static constexpr auto SCYLLA_TABLE_SCHEMA_HISTORY = "scylla_table_schema_history"; // Scylla specific;
schema_ptr keyspaces();
schema_ptr columns();
schema_ptr view_virtual_columns();
schema_ptr dropped_columns();
schema_ptr indexes();
schema_ptr tables();
schema_ptr scylla_tables(schema_features features = schema_features::full());
schema_ptr views();
schema_ptr types();
schema_ptr functions();
schema_ptr aggregates();
schema_ptr computed_columns();
// Belongs to the "system" keyspace
schema_ptr scylla_table_schema_history();
// Returns table ids of all schema tables which contribute to schema_mutations,
// i.e. those which are used to define schema of a table or a view.
// All such tables have a clustering key whose first column is the table name.
const std::unordered_set<table_id>& schema_tables_holding_schema_mutations();
}
namespace legacy {
class schema_mutations {
mutation _columnfamilies;
mutation _columns;
public:
schema_mutations(mutation columnfamilies, mutation columns)
: _columnfamilies(std::move(columnfamilies))
, _columns(std::move(columns))
{ }
table_schema_version digest() const;
};
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s);
}
struct qualified_name {
sstring keyspace_name;
sstring table_name;
qualified_name(sstring keyspace_name, sstring table_name)
: keyspace_name(std::move(keyspace_name))
, table_name(std::move(table_name))
{ }
qualified_name(const schema_ptr& s)
: keyspace_name(s->ks_name())
, table_name(s->cf_name())
{ }
auto operator<=>(const qualified_name&) const = default;
};
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s);
using namespace v3;
// Change on non-backwards compatible changes of schema mutations.
// Replication of schema between nodes with different version is inhibited.
extern const sstring version;
// Returns schema_ptrs for all schema tables supported by given schema_features.
std::vector<schema_ptr> all_tables(schema_features);
// Like all_tables(), but returns table_info of each table.
std::vector<table_info> all_table_infos(schema_features);
// saves/creates all the system objects in the appropriate keyspaces;
// deletes them first, so they will be effectively overwritten.
future<> save_system_schema(cql3::query_processor& qp);
future<table_schema_version> calculate_schema_digest(sharded<service::storage_proxy>& proxy, schema_features, noncopyable_function<bool(std::string_view)> accept_keyspace);
// Calculates schema digest for all non-system keyspaces
future<table_schema_version> calculate_schema_digest(sharded<service::storage_proxy>& proxy, schema_features);
// Must be called on shard 0.
future<semaphore_units<>> hold_merge_lock() noexcept;
future<> with_merge_lock(noncopyable_function<future<> ()> func);
future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks, sharded<service::storage_proxy>& proxy, schema_features features, std::optional<table_schema_version> version_from_group0);
future<std::optional<table_schema_version>> get_group0_schema_version(db::system_keyspace& sys_ks);
// Recalculates the local schema version.
//
// It is safe to call concurrently with recalculate_schema_version() and merge_schema() in which case it
// is guaranteed that the schema version we end up with after all calls will reflect the most recent state
// of feature_service and schema tables.
future<> recalculate_schema_version(sharded<db::system_keyspace>& sys_ks, sharded<service::storage_proxy>& proxy, gms::feature_service& feat);
future<utils::chunked_vector<canonical_mutation>> convert_schema_to_mutations(sharded<service::storage_proxy>& proxy, schema_features);
utils::chunked_vector<mutation> adjust_schema_for_schema_features(utils::chunked_vector<mutation> schema, schema_features features);
future<schema_result_value_type>
read_schema_partition_for_keyspace(sharded<service::storage_proxy>& proxy, sstring schema_table_name, sstring keyspace_name);
future<mutation> read_keyspace_mutation(sharded<service::storage_proxy>&, const sstring& keyspace_name);
utils::chunked_vector<mutation> make_create_keyspace_mutations(schema_features features, lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
utils::chunked_vector<mutation> make_drop_keyspace_mutations(schema_features features, lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp);
future<lw_shared_ptr<keyspace_metadata>> create_keyspace_metadata(const schema_result_value_type& partition, lw_shared_ptr<query::result_set> scylla_specific_rs);
future<lw_shared_ptr<query::result_set>> extract_scylla_specific_keyspace_info(sharded<service::storage_proxy>& proxy, const schema_result_value_type& partition);
utils::chunked_vector<mutation> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
// Given a set of rows that is sorted by keyspace, create types for each keyspace.
// The topological sort in each keyspace is necessary when creating types, since we can only create a type when the
// types it reference have already been created.
future<std::vector<user_type>> create_types(replica::database& db, const std::vector<const query::result_set_row*>& rows, std::map<sstring, std::reference_wrapper<replica::keyspace>>& new_keyspaces);
future<std::vector<user_type>> create_types_from_schema_partition(keyspace_metadata& ks, lw_shared_ptr<query::result_set> result);
std::vector<data_type> read_arg_types(const query::result_set_row& row, const sstring& keyspace, const data_dictionary::user_types_storage& user_types);
future<shared_ptr<cql3::functions::user_function>> create_func(replica::database& db, const query::result_set_row& row, const data_dictionary::user_types_storage& user_types);
seastar::future<std::vector<shared_ptr<cql3::functions::user_function>>> create_functions_from_schema_partition(replica::database& db, lw_shared_ptr<query::result_set> result);
shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row, cql3::functions::change_batch& batch, const data_dictionary::user_types_storage& user_types);
std::vector<shared_ptr<cql3::functions::user_aggregate>> create_aggregates_from_schema_partition(replica::database& db, lw_shared_ptr<query::result_set> result, lw_shared_ptr<query::result_set> scylla_result, cql3::functions::change_batch& batch);
utils::chunked_vector<mutation> make_create_function_mutations(shared_ptr<cql3::functions::user_function> func, api::timestamp_type timestamp);
utils::chunked_vector<mutation> make_drop_function_mutations(shared_ptr<cql3::functions::user_function> func, api::timestamp_type timestamp);
utils::chunked_vector<mutation> make_create_aggregate_mutations(schema_features features, shared_ptr<cql3::functions::user_aggregate> func, api::timestamp_type timestamp);
utils::chunked_vector<mutation> make_drop_aggregate_mutations(schema_features features, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type timestamp);
utils::chunked_vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
void add_type_to_schema_mutation(user_type type, api::timestamp_type timestamp, utils::chunked_vector<mutation>& mutations);
utils::chunked_vector<mutation> make_create_table_mutations(schema_ptr table, api::timestamp_type timestamp);
utils::chunked_vector<mutation> make_update_table_mutations(
service::storage_proxy& sp,
lw_shared_ptr<keyspace_metadata> keyspace,
schema_ptr old_table,
schema_ptr new_table,
api::timestamp_type timestamp);
future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(sharded<service::storage_proxy>& proxy, const schema_result::mapped_type& result);
utils::chunked_vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
schema_ptr create_table_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage& user_types, schema_ptr cdc_schema, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage&, schema_ptr, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage&, std::optional<view::base_dependent_view_info> = {}, std::optional<table_schema_version> version = {});
future<std::vector<view_ptr>> create_views_from_schema_partition(sharded<service::storage_proxy>& proxy, const schema_result::mapped_type& result);
schema_mutations make_schema_mutations(schema_ptr s, api::timestamp_type timestamp, bool with_columns);
mutation make_scylla_tables_mutation(schema_ptr, api::timestamp_type timestamp);
void add_table_or_view_to_schema_mutation(schema_ptr view, api::timestamp_type timestamp, bool with_columns, utils::chunked_vector<mutation>& mutations);
utils::chunked_vector<mutation> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
utils::chunked_vector<mutation> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr old_view, view_ptr new_view, api::timestamp_type timestamp, bool include_base);
utils::chunked_vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view_ptr& v, schema_ptr base_schema);
bool view_should_exist(const index_metadata& im);
sstring serialize_kind(column_kind kind);
column_kind deserialize_kind(sstring kind);
data_type parse_type(sstring str);
sstring serialize_index_kind(index_metadata_kind kind);
index_metadata_kind deserialize_index_kind(sstring kind);
mutation compact_for_schema_digest(const mutation& m);
void feed_hash_for_schema_digest(hasher&, const mutation&, schema_features);
template<typename K, typename V>
std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const sstring& name) {
if (auto values = row.get<map_type_impl::native_type>(name)) {
std::map<K, V> map;
for (auto&& entry : *values) {
map.emplace(value_cast<K>(entry.first), value_cast<V>(entry.second));
};
return map;
}
return std::nullopt;
}
/// Stores the column mapping for the table being created or altered in the system table
/// which holds a history of schema versions alongside with their column mappings.
/// Can be used to insert entries with TTL (equal to DEFAULT_GC_GRACE_SECONDS) in case we are
/// overwriting an existing column mapping to garbage collect obsolete entries.
future<> store_column_mapping(sharded<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
/// Query column mapping for a given version of the table locally.
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Check that column mapping exists for a given version of the table
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table
future<> drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
} // namespace schema_tables
} // namespace db