/* * Modified by ScyllaDB * Copyright (C) 2024-present ScyllaDB */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #pragma once #include "schema/frozen_schema.hh" #include "mutation/mutation.hh" #include #include "service/storage_proxy.hh" #include "query/query-result-set.hh" #include "db/schema_tables.hh" #include "data_dictionary/user_types_metadata.hh" #include "schema/schema_registry.hh" #include "service/storage_service.hh" #include "replica/database.hh" #include "replica/global_table_ptr.hh" #include "replica/tables_metadata_lock.hh" #include #include namespace db { namespace schema_tables { future<> merge_schema(sharded& sys_ks, sharded& proxy, sharded& ss, gms::feature_service& feat, utils::chunked_vector mutations, bool reload = false); enum class table_kind { table, view }; struct table_selector { bool all_in_keyspace = false; // If true, selects all existing tables in a keyspace plus what's in "tables"; std::unordered_map> tables; table_selector& operator+=(table_selector&& o); void add(table_kind t, sstring name); void add(sstring name); }; struct schema_persisted_state { schema_tables::schema_result keyspaces; schema_tables::schema_result scylla_keyspaces; std::map tables; schema_tables::schema_result types; std::map views; std::map cdc; schema_tables::schema_result functions; schema_tables::schema_result aggregates; schema_tables::schema_result scylla_aggregates; }; struct affected_keyspaces_names { std::set created; std::set altered; std::set dropped; }; // groups keyspaces based on what is happening to them during schema change struct affected_keyspaces { std::vector created; std::vector altered; // names need to be copied here as they are used multiple times and // keyspace struct from which we obtain the name is moved when // we commit it affected_keyspaces_names names; }; struct affected_user_types_per_shard { std::vector created; std::vector altered; std::vector dropped; }; // groups UDTs based on what is happening to them during schema change using affected_user_types = sharded; // In_progress_types_storage_per_shard contains current // types with in-progress modifications applied. // Important note: this storage can't be used directly in all cases, // e.g. it's legal to drop type together with dropping other entity // in such case we use existing storage instead so that whatever // is being dropped can reference this type (we remove it from in_progress storage) // in such cases get proper storage via committed_storage(). class in_progress_types_storage_per_shard : public data_dictionary::user_types_storage { std::shared_ptr _stored_user_types; std::map _in_progress_types; public: in_progress_types_storage_per_shard(replica::database& db, const affected_keyspaces& affected_keyspaces, const affected_user_types& affected_types); virtual const data_dictionary::user_types_metadata& get(const sstring& ks) const override; std::shared_ptr committed_storage(); }; class in_progress_types_storage { // wrapped in foreign_ptr so they can be destroyed on the right shard std::vector>> shards; public: in_progress_types_storage() : shards(smp::count) {} future<> init(sharded& sharded_db, const affected_keyspaces& affected_keyspaces, const affected_user_types& affected_types); in_progress_types_storage_per_shard& local(); }; struct frozen_schema_diff { struct altered_schema { extended_frozen_schema old_schema; extended_frozen_schema new_schema; }; std::vector created; std::vector altered; std::vector dropped; }; // schema_diff represents what is happening with tables or views during schema merge struct schema_diff_per_shard { struct altered_schema { schema_ptr old_schema; schema_ptr new_schema; }; std::vector created; std::vector altered; std::vector dropped; future freeze() const; static future copy_from(replica::database&, in_progress_types_storage&, const frozen_schema_diff& oth); }; class pending_token_metadata { std::vector shards{smp::count}; public: future<> assign(locator::mutable_token_metadata_ptr new_token_metadata); locator::mutable_token_metadata_ptr& local(); future<> destroy(); }; struct affected_tables_and_views_per_shard { schema_diff_per_shard tables; schema_diff_per_shard cdc; schema_diff_per_shard views; std::vector columns_changed; }; struct affected_tables_and_views { sharded tables_and_views; std::unordered_map table_shards; }; // We wrap it with pointer because change_batch needs to be constructed and destructed // on the same shard as it's used for. using functions_change_batch_all_shards = sharded; class pending_schema_getter; // Schema_applier encapsulates intermediate state needed to construct schema objects from // set of rows read from system tables (see struct schema_state). It does atomic (per shard) // application of a new schema. class schema_applier { using keyspace_name = sstring; sharded& _proxy; sharded& _ss; sharded& _sys_ks; const bool _reload; std::set _keyspaces; std::unordered_map _affected_tables; // Copy of token metadata used for schema change, it has two purposes: // - makes sure all updated schema entities use the same metadata // - allows to change tablets metadata without immediately committing it. locator::pending_token_metadata _pending_token_metadata; locator::tablet_metadata_change_hint _tablet_hint; service::token_metadata_change _token_metadata_change; schema_persisted_state _before; schema_persisted_state _after; in_progress_types_storage _types_storage; affected_keyspaces _affected_keyspaces; affected_user_types _affected_user_types; affected_tables_and_views _affected_tables_and_views; std::unique_ptr _metadata_locks; functions_change_batch_all_shards _functions_batch; // includes aggregates future get_schema_persisted_state(); future<> load_mutable_token_metadata(); friend pending_schema_getter; public: schema_applier( sharded& proxy, sharded& ss, sharded& sys_ks, bool reload = false) : _proxy(proxy), _ss(ss), _sys_ks(sys_ks), _reload(reload) {}; // Gets called before mutations are applied, // preferably no work should be done here but subsystem // may do some snapshot of 'before' data. future<> prepare(utils::chunked_vector& muts); // Update is called after mutations are applied, it should create // all updates but not yet commit them to a subsystem (i.e. copy on write style). // All changes should be visible only to schema_applier object but not to other subsystems. future<> update(); // Makes updates visible. Before calling this function in memory state as observed by other // components should not yet change. The function atomically switches current state with // new state (the one built in update function). future<> commit(); // Post_commit is called after commit and allows to trigger code which can't provide // atomicity either for legacy reasons or causes side effects to an external system // (e.g. informing client's driver). future<> post_commit(); // Some destruction may need to be done on particular shard hence we need to run it in coroutine. future<> destroy(); private: future<> merge_keyspaces(); future<> merge_types(); future<> merge_tables_and_views(); future<> update_tablets(); future<> merge_functions(); future<> merge_aggregates(); void commit_tables_and_views(); future<> finalize_tables_and_views(); void commit_on_shard(replica::database& db); }; } }