Files
scylladb/db/schema_applier.hh
Michael Litvak 4fe13c04a9 db: schema_applier: extract cdc tables
Previously in the schema applier we have two maps of schema_mutations,
for tables and for views. Now create another map for CDC tables by
extracting them from the non-views tables map.

We maintain the previous behavior by applying each operation that's done
on the tables map, to the CDC map as well.

Later we will want to handle CDC and non-CDC tables differently. We want
to be able to create all CDC schemas first, so when we create the
non-CDC tables we can create them with a pointer to their CDC schemas.
2025-10-21 14:13:43 +02:00

241 lines
8.9 KiB
C++

/*
* Modified by ScyllaDB
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "schema/frozen_schema.hh"
#include "mutation/mutation.hh"
#include <seastar/core/future.hh>
#include "service/storage_proxy.hh"
#include "query/query-result-set.hh"
#include "db/schema_tables.hh"
#include "data_dictionary/user_types_metadata.hh"
#include "schema/schema_registry.hh"
#include "service/storage_service.hh"
#include "replica/database.hh"
#include "replica/global_table_ptr.hh"
#include "replica/tables_metadata_lock.hh"
#include <seastar/core/sharded.hh>
#include <unordered_map>
namespace db {
namespace schema_tables {
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, sharded<service::storage_proxy>& proxy, sharded<service::storage_service>& ss, gms::feature_service& feat, utils::chunked_vector<mutation> mutations, bool reload = false);
enum class table_kind { table, view };
struct table_selector {
bool all_in_keyspace = false; // If true, selects all existing tables in a keyspace plus what's in "tables";
std::unordered_map<table_kind, std::unordered_set<sstring>> tables;
table_selector& operator+=(table_selector&& o);
void add(table_kind t, sstring name);
void add(sstring name);
};
struct schema_persisted_state {
schema_tables::schema_result keyspaces;
schema_tables::schema_result scylla_keyspaces;
std::map<table_id, schema_mutations> tables;
schema_tables::schema_result types;
std::map<table_id, schema_mutations> views;
std::map<table_id, schema_mutations> cdc;
schema_tables::schema_result functions;
schema_tables::schema_result aggregates;
schema_tables::schema_result scylla_aggregates;
};
struct affected_keyspaces_names {
std::set<sstring> created;
std::set<sstring> altered;
std::set<sstring> dropped;
};
// groups keyspaces based on what is happening to them during schema change
struct affected_keyspaces {
std::vector<replica::database::created_keyspace_per_shard> created;
std::vector<replica::database::keyspace_change_per_shard> altered;
// names need to be copied here as they are used multiple times and
// keyspace struct from which we obtain the name is moved when
// we commit it
affected_keyspaces_names names;
};
struct affected_user_types_per_shard {
std::vector<user_type> created;
std::vector<user_type> altered;
std::vector<user_type> dropped;
};
// groups UDTs based on what is happening to them during schema change
using affected_user_types = sharded<affected_user_types_per_shard>;
// In_progress_types_storage_per_shard contains current
// types with in-progress modifications applied.
// Important note: this storage can't be used directly in all cases,
// e.g. it's legal to drop type together with dropping other entity
// in such case we use existing storage instead so that whatever
// is being dropped can reference this type (we remove it from in_progress storage)
// in such cases get proper storage via committed_storage().
class in_progress_types_storage_per_shard : public data_dictionary::user_types_storage {
std::shared_ptr<data_dictionary::user_types_storage> _stored_user_types;
std::map<sstring, data_dictionary::user_types_metadata> _in_progress_types;
public:
in_progress_types_storage_per_shard(replica::database& db, const affected_keyspaces& affected_keyspaces, const affected_user_types& affected_types);
virtual const data_dictionary::user_types_metadata& get(const sstring& ks) const override;
std::shared_ptr<data_dictionary::user_types_storage> committed_storage();
};
class in_progress_types_storage {
// wrapped in foreign_ptr so they can be destroyed on the right shard
std::vector<foreign_ptr<shared_ptr<in_progress_types_storage_per_shard>>> shards;
public:
in_progress_types_storage() : shards(smp::count) {}
future<> init(sharded<replica::database>& sharded_db, const affected_keyspaces& affected_keyspaces, const affected_user_types& affected_types);
in_progress_types_storage_per_shard& local();
};
struct frozen_schema_diff {
struct altered_schema {
extended_frozen_schema old_schema;
extended_frozen_schema new_schema;
};
std::vector<extended_frozen_schema> created;
std::vector<altered_schema> altered;
std::vector<extended_frozen_schema> dropped;
};
// schema_diff represents what is happening with tables or views during schema merge
struct schema_diff_per_shard {
struct altered_schema {
schema_ptr old_schema;
schema_ptr new_schema;
};
std::vector<schema_ptr> created;
std::vector<altered_schema> altered;
std::vector<schema_ptr> dropped;
future<frozen_schema_diff> freeze() const;
static future<schema_diff_per_shard> copy_from(replica::database&, in_progress_types_storage&, const frozen_schema_diff& oth);
};
class pending_token_metadata {
std::vector<locator::mutable_token_metadata_ptr> shards{smp::count};
public:
future<> assign(locator::mutable_token_metadata_ptr new_token_metadata);
locator::mutable_token_metadata_ptr& local();
future<> destroy();
};
struct affected_tables_and_views_per_shard {
schema_diff_per_shard tables;
schema_diff_per_shard cdc;
schema_diff_per_shard views;
std::vector<bool> columns_changed;
};
struct affected_tables_and_views {
sharded<affected_tables_and_views_per_shard> tables_and_views;
std::unordered_map<table_id, replica::global_table_ptr> table_shards;
};
// We wrap it with pointer because change_batch needs to be constructed and destructed
// on the same shard as it's used for.
using functions_change_batch_all_shards = sharded<cql3::functions::change_batch>;
class pending_schema_getter;
// Schema_applier encapsulates intermediate state needed to construct schema objects from
// set of rows read from system tables (see struct schema_state). It does atomic (per shard)
// application of a new schema.
class schema_applier {
using keyspace_name = sstring;
sharded<service::storage_proxy>& _proxy;
sharded<service::storage_service>& _ss;
sharded<db::system_keyspace>& _sys_ks;
const bool _reload;
std::set<sstring> _keyspaces;
std::unordered_map<keyspace_name, table_selector> _affected_tables;
// Copy of token metadata used for schema change, it has two purposes:
// - makes sure all updated schema entities use the same metadata
// - allows to change tablets metadata without immediately committing it.
locator::pending_token_metadata _pending_token_metadata;
locator::tablet_metadata_change_hint _tablet_hint;
service::token_metadata_change _token_metadata_change;
schema_persisted_state _before;
schema_persisted_state _after;
in_progress_types_storage _types_storage;
affected_keyspaces _affected_keyspaces;
affected_user_types _affected_user_types;
affected_tables_and_views _affected_tables_and_views;
std::unique_ptr<replica::tables_metadata_lock_on_all_shards> _metadata_locks;
functions_change_batch_all_shards _functions_batch; // includes aggregates
future<schema_persisted_state> get_schema_persisted_state();
future<> load_mutable_token_metadata();
friend pending_schema_getter;
public:
schema_applier(
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<db::system_keyspace>& sys_ks,
bool reload = false)
: _proxy(proxy), _ss(ss), _sys_ks(sys_ks), _reload(reload) {};
// Gets called before mutations are applied,
// preferably no work should be done here but subsystem
// may do some snapshot of 'before' data.
future<> prepare(utils::chunked_vector<mutation>& muts);
// Update is called after mutations are applied, it should create
// all updates but not yet commit them to a subsystem (i.e. copy on write style).
// All changes should be visible only to schema_applier object but not to other subsystems.
future<> update();
// Makes updates visible. Before calling this function in memory state as observed by other
// components should not yet change. The function atomically switches current state with
// new state (the one built in update function).
future<> commit();
// Post_commit is called after commit and allows to trigger code which can't provide
// atomicity either for legacy reasons or causes side effects to an external system
// (e.g. informing client's driver).
future<> post_commit();
// Some destruction may need to be done on particular shard hence we need to run it in coroutine.
future<> destroy();
private:
future<> merge_keyspaces();
future<> merge_types();
future<> merge_tables_and_views();
future<> update_tablets();
future<> merge_functions();
future<> merge_aggregates();
void commit_tables_and_views();
future<> finalize_tables_and_views();
void commit_on_shard(replica::database& db);
};
}
}