diff --git a/data_dictionary/user_types_metadata.hh b/data_dictionary/user_types_metadata.hh index b2265b29a6..b998c588de 100644 --- a/data_dictionary/user_types_metadata.hh +++ b/data_dictionary/user_types_metadata.hh @@ -36,4 +36,17 @@ public: friend std::ostream& operator<<(std::ostream& os, const user_types_metadata& m); }; +class user_types_storage { +public: + virtual const user_types_metadata& get(const sstring& ks) const = 0; +}; + +class dummy_user_types_storage : public user_types_storage { + user_types_metadata _empty; +public: + virtual const user_types_metadata& get(const sstring& ks) const override { + return _empty; + } +}; + } diff --git a/db/cql_type_parser.cc b/db/cql_type_parser.cc index ee1c92eea9..6a81cbfcfd 100644 --- a/db/cql_type_parser.cc +++ b/db/cql_type_parser.cc @@ -14,7 +14,6 @@ #include #include "replica/database.hh" -#include "service/storage_proxy.hh" #include "cql3/CqlParser.hpp" #include "cql3/util.hh" #include "cql_type_parser.hh" @@ -28,7 +27,7 @@ static ::shared_ptr parse_raw(const sstring& str) { }); } -data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str) { +data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str, const data_dictionary::user_types_storage& uts) { static const thread_local std::unordered_map native_types = []{ std::unordered_map res; for (auto& nt : cql3::cql3_type::values()) { @@ -42,12 +41,8 @@ data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str return i->second.get_type(); } - const auto& sp = service::get_storage_proxy(); - const replica::user_types_metadata& user_types = - sp.local_is_initialized() ? sp.local().get_db().local().find_keyspace(keyspace).metadata()->user_types() - : replica::user_types_metadata{}; auto raw = parse_raw(str); - return raw->prepare_internal(keyspace, user_types).get_type(); + return raw->prepare_internal(keyspace, uts.get(keyspace)).get_type(); } class db::cql_type_parser::raw_builder::impl { diff --git a/db/cql_type_parser.hh b/db/cql_type_parser.hh index 2b4d0fd8a8..a88a1e0c1a 100644 --- a/db/cql_type_parser.hh +++ b/db/cql_type_parser.hh @@ -24,12 +24,13 @@ class types_metadata; namespace data_dictionary { class keyspace_metadata; +class user_types_storage; } namespace db { namespace cql_type_parser { -data_type parse(const sstring& keyspace, const sstring& type); +data_type parse(const sstring& keyspace, const sstring& type, const data_dictionary::user_types_storage& uts); class raw_builder { public: diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 1aa9ccf2a4..4c1945a968 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -91,14 +91,15 @@ static bool is_extra_durable(const sstring& ks_name, const sstring& cf_name) { /** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */ namespace db { -schema_ctxt::schema_ctxt(const db::config& cfg) +schema_ctxt::schema_ctxt(const db::config& cfg, std::shared_ptr uts) : _extensions(cfg.extensions()) , _murmur3_partitioner_ignore_msb_bits(cfg.murmur3_partitioner_ignore_msb_bits()) , _schema_registry_grace_period(cfg.schema_registry_grace_period()) + , _user_types(std::move(uts)) {} schema_ctxt::schema_ctxt(const replica::database& db) - : schema_ctxt(db.get_config()) + : schema_ctxt(db.get_config(), db.as_user_types_storage()) {} schema_ctxt::schema_ctxt(distributed& db) @@ -164,6 +165,7 @@ using computed_columns_map = std::unordered_map; static computed_columns_map get_computed_columns(const schema_mutations& sm); static std::vector create_columns_from_column_rows( + const schema_ctxt& ctxt, const query::result_set& rows, const sstring& keyspace, const sstring& table, bool is_super, column_view_virtual is_view_virtual, const computed_columns_map& computed_columns); @@ -1506,10 +1508,10 @@ static future merge_types(distributed read_arg_types(const query::result_set_row& row, const sstring& keyspace) { +static std::vector read_arg_types(replica::database& db, const query::result_set_row& row, const sstring& keyspace) { std::vector arg_types; for (const auto& arg : get_list(row, "argument_types")) { - arg_types.push_back(db::cql_type_parser::parse(keyspace, arg)); + arg_types.push_back(db::cql_type_parser::parse(keyspace, arg, db.user_types())); } return arg_types; } @@ -1572,8 +1574,8 @@ static std::vector read_arg_types(const query::result_set_row& row, c static shared_ptr create_func(replica::database& db, const query::result_set_row& row) { cql3::functions::function_name name{ row.get_nonnull("keyspace_name"), row.get_nonnull("function_name")}; - auto arg_types = read_arg_types(row, name.keyspace); - data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("return_type")); + auto arg_types = read_arg_types(db, row, name.keyspace); + data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("return_type"), db.user_types()); // FIXME: We already computed the bitcode in // create_function_statement, but it is not clear how to get it @@ -1610,8 +1612,8 @@ static shared_ptr create_func(replica::database& static shared_ptr create_aggregate(replica::database& db, const query::result_set_row& row) { cql3::functions::function_name name{ row.get_nonnull("keyspace_name"), row.get_nonnull("aggregate_name")}; - auto arg_types = read_arg_types(row, name.keyspace); - data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("state_type")); + auto arg_types = read_arg_types(db, row, name.keyspace); + data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("state_type"), db.user_types()); sstring sfunc = row.get_nonnull("state_func"); auto ffunc = row.get("final_func"); auto initcond_str = row.get("initcond"); @@ -2661,6 +2663,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations auto computed_columns = get_computed_columns(sm); std::vector column_defs = create_columns_from_column_rows( + ctxt, query::result_set(sm.columns_mutation()), ks_name, cf_name,/*, @@ -2691,7 +2694,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations query::result_set dcr(*sm.dropped_columns_mutation()); for (auto& row : dcr.rows()) { auto name = row.get_nonnull("column_name"); - auto type = cql_type_parser::parse(ks_name, row.get_nonnull("type")); + auto type = cql_type_parser::parse(ks_name, row.get_nonnull("type"), ctxt.user_types()); auto time = row.get_nonnull("dropped_time"); builder.without_column(name, type, time.time_since_epoch().count()); } @@ -2842,7 +2845,8 @@ static computed_columns_map get_computed_columns(const schema_mutations& sm) { })); } -static std::vector create_columns_from_column_rows(const query::result_set& rows, +static std::vector create_columns_from_column_rows(const schema_ctxt& ctxt, + const query::result_set& rows, const sstring& keyspace, const sstring& table, /*, AbstractType rawComparator, */ @@ -2853,7 +2857,7 @@ static std::vector create_columns_from_column_rows(const quer std::vector columns; for (auto&& row : rows.rows()) { auto kind = deserialize_kind(row.get_nonnull("kind")); - auto type = cql_type_parser::parse(keyspace, row.get_nonnull("type")); + auto type = cql_type_parser::parse(keyspace, row.get_nonnull("type"), ctxt.user_types()); auto name_bytes = row.get_nonnull("column_name_bytes"); column_id position = row.get_nonnull("position"); @@ -2916,12 +2920,12 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm prepare_builder_from_table_row(ctxt, builder, row); auto computed_columns = get_computed_columns(sm); - auto column_defs = create_columns_from_column_rows(query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns); + auto column_defs = create_columns_from_column_rows(ctxt, query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns); for (auto&& cdef : column_defs) { builder.with_column_ordered(cdef); } if (sm.view_virtual_columns_mutation()) { - column_defs = create_columns_from_column_rows(query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns); + column_defs = create_columns_from_column_rows(ctxt, query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns); for (auto&& cdef : column_defs) { builder.with_column_ordered(cdef); } @@ -3315,7 +3319,7 @@ future get_column_mapping(utils::UUID table_id, table_schema_ver std::vector static_columns, regular_columns; for (const auto& row : *results) { auto kind = deserialize_kind(row.get_as("kind")); - auto type = cql_type_parser::parse("" /*unused*/, row.get_as("type")); + auto type = cql_type_parser::parse("" /*unused*/, row.get_as("type"), data_dictionary::dummy_user_types_storage()); auto name_bytes = row.get_blob("column_name_bytes"); column_id position = row.get_as("position"); diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 29756c922d..89fd202472 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -27,6 +27,7 @@ namespace data_dictionary { class keyspace_metadata; +class user_types_storage; } using keyspace_metadata = data_dictionary::keyspace_metadata; @@ -67,7 +68,7 @@ class config; class schema_ctxt { public: - schema_ctxt(const config&); + schema_ctxt(const config&, std::shared_ptr uts); schema_ctxt(const replica::database&); schema_ctxt(distributed&); schema_ctxt(distributed&); @@ -84,10 +85,15 @@ public: return _schema_registry_grace_period; } + const data_dictionary::user_types_storage& user_types() const noexcept { + return *_user_types; + } + private: const db::extensions& _extensions; const unsigned _murmur3_partitioner_ignore_msb_bits; const uint32_t _schema_registry_grace_period; + const std::shared_ptr _user_types; }; namespace schema_tables { diff --git a/replica/database.cc b/replica/database.cc index 837b6b3170..b78b4cc152 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -303,9 +303,28 @@ void database::setup_scylla_memory_diagnostics_producer() { }); } +class db_user_types_storage : public data_dictionary::dummy_user_types_storage { + const replica::database* _db = nullptr; +public: + db_user_types_storage(const database& db) noexcept : _db(&db) {} + + virtual const user_types_metadata& get(const sstring& ks) const override { + if (_db == nullptr) { + return dummy_user_types_storage::get(ks); + } + + return _db->find_keyspace(ks).metadata()->user_types(); + } + + void deactivate() noexcept { + _db = nullptr; + } +}; + database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, abort_source& as, sharded& sst_dir_sem, utils::cross_shard_barrier barrier) : _stats(make_lw_shared()) + , _user_types(std::make_shared(*this)) , _cl_stats(std::make_unique()) , _cfg(cfg) // Allow system tables a pool of 10 MB memory to write, but never block on other regions. @@ -376,6 +395,14 @@ const db::extensions& database::extensions() const { return get_config().extensions(); } +std::shared_ptr database::as_user_types_storage() const noexcept { + return _user_types; +} + +const data_dictionary::user_types_storage& database::user_types() const noexcept { + return *_user_types; +} + } // namespace replica void backlog_controller::adjust() { @@ -713,6 +740,7 @@ void database::set_format(sstables::sstable_version_types format) noexcept { } database::~database() { + _user_types->deactivate(); } void database::update_version(const utils::UUID& version) { diff --git a/replica/database.hh b/replica/database.hh index 990e255221..b525dd08b1 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1199,6 +1199,8 @@ struct string_pair_eq { bool operator()(spair lhs, spair rhs) const; }; +class db_user_types_storage; + // Policy for distributed: // broadcast metadata writes // local metadata reads @@ -1253,6 +1255,7 @@ private: }; lw_shared_ptr _stats; + std::shared_ptr _user_types; std::unique_ptr _cl_stats; const db::config& _cfg; @@ -1320,6 +1323,8 @@ private: public: data_dictionary::database as_data_dictionary() const; + std::shared_ptr as_user_types_storage() const noexcept; + const data_dictionary::user_types_storage& user_types() const noexcept; future<> init_commitlog(); const gms::feature_service& features() const { return _feat; } future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout); diff --git a/test/boost/schema_registry_test.cc b/test/boost/schema_registry_test.cc index 55a1283806..088bf3823b 100644 --- a/test/boost/schema_registry_test.cc +++ b/test/boost/schema_registry_test.cc @@ -34,7 +34,7 @@ struct dummy_init { dummy_init() { config = std::make_unique(); - local_schema_registry().init(*config); + local_schema_registry().init(db::schema_ctxt(*config,std::make_shared())); } }; diff --git a/tools/schema_loader.cc b/tools/schema_loader.cc index 802d0deb9a..fc306648b7 100644 --- a/tools/schema_loader.cc +++ b/tools/schema_loader.cc @@ -160,6 +160,21 @@ private: } }; +class user_types_storage : public data_dictionary::user_types_storage { + database& _db; +public: + user_types_storage(database& db) noexcept : _db(db) {} + + virtual const data_dictionary::user_types_metadata& get(const sstring& name) const override { + for (const auto& ks : _db.keyspaces) { + if (ks.metadata->name() == name) { + return ks.metadata->user_types(); + } + } + throw data_dictionary::no_such_keyspace(name); + } +}; + table::table(data_dictionary_impl& impl, keyspace& ks, schema_ptr schema) : ks(ks), schema(std::move(schema)), secondary_idx_man(impl.wrap(*this)) { } @@ -269,7 +284,7 @@ std::vector do_load_schemas(std::string_view schema_str) { db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, keyspace_name, table_name)); } auto name = row.get_nonnull("column_name"); - auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull("type")); + auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull("type"), user_types_storage(real_db)); auto time = row.get_nonnull("dropped_time"); *it = schema_builder(*it).without_column(std::move(name), std::move(type), time.time_since_epoch().count()).build(); }