Files
scylladb/db/schema_applier.hh
Gleb Natapov 08e33ad7f7 schema: drop recalculate_schema_version function and its uses
There is no need to recalculate schema version any more since it is set
by group0.
2026-03-10 10:46:39 +02:00

241 lines
8.9 KiB
C++

/*
* Modified by ScyllaDB
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "schema/frozen_schema.hh"
#include "mutation/mutation.hh"
#include <seastar/core/future.hh>
#include "service/storage_proxy.hh"
#include "query/query-result-set.hh"
#include "db/schema_tables.hh"
#include "data_dictionary/user_types_metadata.hh"
#include "schema/schema_registry.hh"
#include "service/storage_service.hh"
#include "replica/database.hh"
#include "replica/global_table_ptr.hh"
#include "replica/tables_metadata_lock.hh"
#include <seastar/core/sharded.hh>
#include <unordered_map>
namespace db {
namespace schema_tables {
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, sharded<service::storage_proxy>& proxy, sharded<service::storage_service>& ss, utils::chunked_vector<mutation> mutations, bool reload = false);
enum class table_kind { table, view };
struct table_selector {
bool all_in_keyspace = false; // If true, selects all existing tables in a keyspace plus what's in "tables";
std::unordered_map<table_kind, std::unordered_set<sstring>> tables;
table_selector& operator+=(table_selector&& o);
void add(table_kind t, sstring name);
void add(sstring name);
};
struct schema_persisted_state {
schema_tables::schema_result keyspaces;
schema_tables::schema_result scylla_keyspaces;
std::map<table_id, schema_mutations> tables;
schema_tables::schema_result types;
std::map<table_id, schema_mutations> views;
std::map<table_id, schema_mutations> cdc;
schema_tables::schema_result functions;
schema_tables::schema_result aggregates;
schema_tables::schema_result scylla_aggregates;
};
struct affected_keyspaces_names {
std::set<sstring> created;
std::set<sstring> altered;
std::set<sstring> dropped;
};
// groups keyspaces based on what is happening to them during schema change
struct affected_keyspaces {
std::vector<replica::database::created_keyspace_per_shard> created;
std::vector<replica::database::keyspace_change_per_shard> altered;
// names need to be copied here as they are used multiple times and
// keyspace struct from which we obtain the name is moved when
// we commit it
affected_keyspaces_names names;
};
struct affected_user_types_per_shard {
std::vector<user_type> created;
std::vector<user_type> altered;
std::vector<user_type> dropped;
};
// groups UDTs based on what is happening to them during schema change
using affected_user_types = sharded<affected_user_types_per_shard>;
// In_progress_types_storage_per_shard contains current
// types with in-progress modifications applied.
// Important note: this storage can't be used directly in all cases,
// e.g. it's legal to drop type together with dropping other entity
// in such case we use existing storage instead so that whatever
// is being dropped can reference this type (we remove it from in_progress storage)
// in such cases get proper storage via committed_storage().
class in_progress_types_storage_per_shard : public data_dictionary::user_types_storage {
std::shared_ptr<data_dictionary::user_types_storage> _stored_user_types;
std::map<sstring, data_dictionary::user_types_metadata> _in_progress_types;
public:
in_progress_types_storage_per_shard(replica::database& db, const affected_keyspaces& affected_keyspaces, const affected_user_types& affected_types);
virtual const data_dictionary::user_types_metadata& get(const sstring& ks) const override;
std::shared_ptr<data_dictionary::user_types_storage> committed_storage();
};
class in_progress_types_storage {
// wrapped in foreign_ptr so they can be destroyed on the right shard
std::vector<foreign_ptr<shared_ptr<in_progress_types_storage_per_shard>>> shards;
public:
in_progress_types_storage() : shards(smp::count) {}
future<> init(sharded<replica::database>& sharded_db, const affected_keyspaces& affected_keyspaces, const affected_user_types& affected_types);
in_progress_types_storage_per_shard& local();
};
struct frozen_schema_diff {
struct altered_schema {
extended_frozen_schema old_schema;
extended_frozen_schema new_schema;
};
std::vector<extended_frozen_schema> created;
std::vector<altered_schema> altered;
std::vector<extended_frozen_schema> dropped;
};
// schema_diff represents what is happening with tables or views during schema merge
struct schema_diff_per_shard {
struct altered_schema {
schema_ptr old_schema;
schema_ptr new_schema;
};
std::vector<schema_ptr> created;
std::vector<altered_schema> altered;
std::vector<schema_ptr> dropped;
future<frozen_schema_diff> freeze() const;
static future<schema_diff_per_shard> copy_from(replica::database&, in_progress_types_storage&, const frozen_schema_diff& oth);
};
class pending_token_metadata {
std::vector<locator::mutable_token_metadata_ptr> shards{smp::count};
public:
future<> assign(locator::mutable_token_metadata_ptr new_token_metadata);
locator::mutable_token_metadata_ptr& local();
future<> destroy();
};
struct affected_tables_and_views_per_shard {
schema_diff_per_shard tables;
schema_diff_per_shard cdc;
schema_diff_per_shard views;
std::vector<bool> columns_changed;
};
struct affected_tables_and_views {
sharded<affected_tables_and_views_per_shard> tables_and_views;
std::unordered_map<table_id, replica::global_table_ptr> table_shards;
};
// We wrap it with pointer because change_batch needs to be constructed and destructed
// on the same shard as it's used for.
using functions_change_batch_all_shards = sharded<cql3::functions::change_batch>;
class pending_schema_getter;
// Schema_applier encapsulates intermediate state needed to construct schema objects from
// set of rows read from system tables (see struct schema_state). It does atomic (per shard)
// application of a new schema.
class schema_applier {
using keyspace_name = sstring;
sharded<service::storage_proxy>& _proxy;
sharded<service::storage_service>& _ss;
sharded<db::system_keyspace>& _sys_ks;
const bool _reload;
std::set<sstring> _keyspaces;
std::unordered_map<keyspace_name, table_selector> _affected_tables;
// Copy of token metadata used for schema change, it has two purposes:
// - makes sure all updated schema entities use the same metadata
// - allows to change tablets metadata without immediately committing it.
locator::pending_token_metadata _pending_token_metadata;
locator::tablet_metadata_change_hint _tablet_hint;
service::token_metadata_change _token_metadata_change;
schema_persisted_state _before;
schema_persisted_state _after;
in_progress_types_storage _types_storage;
affected_keyspaces _affected_keyspaces;
affected_user_types _affected_user_types;
affected_tables_and_views _affected_tables_and_views;
std::unique_ptr<replica::tables_metadata_lock_on_all_shards> _metadata_locks;
functions_change_batch_all_shards _functions_batch; // includes aggregates
future<schema_persisted_state> get_schema_persisted_state();
future<> load_mutable_token_metadata();
friend pending_schema_getter;
public:
schema_applier(
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<db::system_keyspace>& sys_ks,
bool reload = false)
: _proxy(proxy), _ss(ss), _sys_ks(sys_ks), _reload(reload) {};
// Gets called before mutations are applied,
// preferably no work should be done here but subsystem
// may do some snapshot of 'before' data.
future<> prepare(utils::chunked_vector<mutation>& muts);
// Update is called after mutations are applied, it should create
// all updates but not yet commit them to a subsystem (i.e. copy on write style).
// All changes should be visible only to schema_applier object but not to other subsystems.
future<> update();
// Makes updates visible. Before calling this function in memory state as observed by other
// components should not yet change. The function atomically switches current state with
// new state (the one built in update function).
future<> commit();
// Post_commit is called after commit and allows to trigger code which can't provide
// atomicity either for legacy reasons or causes side effects to an external system
// (e.g. informing client's driver).
future<> post_commit();
// Some destruction may need to be done on particular shard hence we need to run it in coroutine.
future<> destroy();
private:
future<> merge_keyspaces();
future<> merge_types();
future<> merge_tables_and_views();
future<> update_tablets();
future<> merge_functions();
future<> merge_aggregates();
void commit_tables_and_views();
future<> finalize_tables_and_views();
void commit_on_shard(replica::database& db);
};
}
}