From 860dbab47474a33eee2a094c262ed713eb1cd521 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 5 May 2022 08:31:49 +0300 Subject: [PATCH 1/4] data_dictionary: Introduce user types storage The interface in question will be used by cql type parser to get user types. There are already three possible implementations of it: - dummy, when no user types are in use (e.g. tests) - schema-loader one, which gets user types from keyspaces that are collected on its implementation of the database - replica::database one, which does the same, but uses the real database instance and that will be shared between scema_ctxts Signed-off-by: Pavel Emelyanov --- data_dictionary/user_types_metadata.hh | 13 +++++++++++++ replica/database.cc | 18 ++++++++++++++++++ tools/schema_loader.cc | 15 +++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/data_dictionary/user_types_metadata.hh b/data_dictionary/user_types_metadata.hh index b2265b29a6..b998c588de 100644 --- a/data_dictionary/user_types_metadata.hh +++ b/data_dictionary/user_types_metadata.hh @@ -36,4 +36,17 @@ public: friend std::ostream& operator<<(std::ostream& os, const user_types_metadata& m); }; +class user_types_storage { +public: + virtual const user_types_metadata& get(const sstring& ks) const = 0; +}; + +class dummy_user_types_storage : public user_types_storage { + user_types_metadata _empty; +public: + virtual const user_types_metadata& get(const sstring& ks) const override { + return _empty; + } +}; + } diff --git a/replica/database.cc b/replica/database.cc index 23a487cb2e..f04995063d 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -303,6 +303,24 @@ void database::setup_scylla_memory_diagnostics_producer() { }); } +class db_user_types_storage : public data_dictionary::dummy_user_types_storage { + const replica::database* _db = nullptr; +public: + db_user_types_storage(const database& db) noexcept : _db(&db) {} + + virtual const user_types_metadata& get(const sstring& ks) const override { + if (_db == nullptr) { + return dummy_user_types_storage::get(ks); + } + + return _db->find_keyspace(ks).metadata()->user_types(); + } + + void deactivate() noexcept { + _db = nullptr; + } +}; + database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, abort_source& as, sharded& sst_dir_sem, utils::cross_shard_barrier barrier) : _stats(make_lw_shared()) diff --git a/tools/schema_loader.cc b/tools/schema_loader.cc index 802d0deb9a..6f60ff71ff 100644 --- a/tools/schema_loader.cc +++ b/tools/schema_loader.cc @@ -160,6 +160,21 @@ private: } }; +class user_types_storage : public data_dictionary::user_types_storage { + database& _db; +public: + user_types_storage(database& db) noexcept : _db(db) {} + + virtual const data_dictionary::user_types_metadata& get(const sstring& name) const override { + for (const auto& ks : _db.keyspaces) { + if (ks.metadata->name() == name) { + return ks.metadata->user_types(); + } + } + throw data_dictionary::no_such_keyspace(name); + } +}; + table::table(data_dictionary_impl& impl, keyspace& ks, schema_ptr schema) : ks(ks), schema(std::move(schema)), secondary_idx_man(impl.wrap(*this)) { } From 2104d90dd04a3786d011219aa2b6128c2ad01ec9 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 5 May 2022 09:32:44 +0300 Subject: [PATCH 2/4] user_types: Carry storage on database and schema_ctxt The user types storage is needed in cql_type_parser::parse which is in turn called with either replica::database or scema_ctxt at hand. To facilitate the former case replica::database has its own user types storage created in database constructor. The latter case is a bit trickier. In many cases the ctxt is created as a temporary object and the database is available at those places. Also the ctxt object lives on the schema_registry instance which doesn't have database nearby. However, that ctxt lifetime is the same as the registry instance one and when it's created there's a database at hand (it's the database constructor that calls schema_registry.init() passing "this" into it). Thus, the solution is to make database's user types storage be a shared pointer that's shared between database itself and all the ctxts out there including the one that lives on schema_registry instance. When database goes away it .deactivate()s its user types storage so that any ctxts that may share it stay on the safe side and don't use database after free. This part will go away when the schema_registry will be deglobalized. Signed-off-by: Pavel Emelyanov --- db/schema_tables.cc | 5 +++-- db/schema_tables.hh | 4 +++- replica/database.cc | 6 ++++++ replica/database.hh | 4 ++++ test/boost/schema_registry_test.cc | 2 +- 5 files changed, 17 insertions(+), 4 deletions(-) diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 32791f4ab4..0cf7d7f5f7 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -91,14 +91,15 @@ static bool is_extra_durable(const sstring& ks_name, const sstring& cf_name) { /** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */ namespace db { -schema_ctxt::schema_ctxt(const db::config& cfg) +schema_ctxt::schema_ctxt(const db::config& cfg, std::shared_ptr uts) : _extensions(cfg.extensions()) , _murmur3_partitioner_ignore_msb_bits(cfg.murmur3_partitioner_ignore_msb_bits()) , _schema_registry_grace_period(cfg.schema_registry_grace_period()) + , _user_types(std::move(uts)) {} schema_ctxt::schema_ctxt(const replica::database& db) - : schema_ctxt(db.get_config()) + : schema_ctxt(db.get_config(), db.as_user_types_storage()) {} schema_ctxt::schema_ctxt(distributed& db) diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 29756c922d..aa77f24ea2 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -27,6 +27,7 @@ namespace data_dictionary { class keyspace_metadata; +class user_types_storage; } using keyspace_metadata = data_dictionary::keyspace_metadata; @@ -67,7 +68,7 @@ class config; class schema_ctxt { public: - schema_ctxt(const config&); + schema_ctxt(const config&, std::shared_ptr uts); schema_ctxt(const replica::database&); schema_ctxt(distributed&); schema_ctxt(distributed&); @@ -88,6 +89,7 @@ private: const db::extensions& _extensions; const unsigned _murmur3_partitioner_ignore_msb_bits; const uint32_t _schema_registry_grace_period; + const std::shared_ptr _user_types; }; namespace schema_tables { diff --git a/replica/database.cc b/replica/database.cc index f04995063d..edaddf4109 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -324,6 +324,7 @@ public: database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, abort_source& as, sharded& sst_dir_sem, utils::cross_shard_barrier barrier) : _stats(make_lw_shared()) + , _user_types(std::make_shared(*this)) , _cl_stats(std::make_unique()) , _cfg(cfg) // Allow system tables a pool of 10 MB memory to write, but never block on other regions. @@ -394,6 +395,10 @@ const db::extensions& database::extensions() const { return get_config().extensions(); } +std::shared_ptr database::as_user_types_storage() const noexcept { + return _user_types; +} + } // namespace replica void backlog_controller::adjust() { @@ -731,6 +736,7 @@ void database::set_format(sstables::sstable_version_types format) noexcept { } database::~database() { + _user_types->deactivate(); } void database::update_version(const utils::UUID& version) { diff --git a/replica/database.hh b/replica/database.hh index cc7f07f614..74242d1419 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1206,6 +1206,8 @@ struct string_pair_eq { bool operator()(spair lhs, spair rhs) const; }; +class db_user_types_storage; + // Policy for distributed: // broadcast metadata writes // local metadata reads @@ -1260,6 +1262,7 @@ private: }; lw_shared_ptr _stats; + std::shared_ptr _user_types; std::unique_ptr _cl_stats; const db::config& _cfg; @@ -1327,6 +1330,7 @@ private: public: data_dictionary::database as_data_dictionary() const; + std::shared_ptr as_user_types_storage() const noexcept; future<> init_commitlog(); const gms::feature_service& features() const { return _feat; } future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout); diff --git a/test/boost/schema_registry_test.cc b/test/boost/schema_registry_test.cc index 55a1283806..088bf3823b 100644 --- a/test/boost/schema_registry_test.cc +++ b/test/boost/schema_registry_test.cc @@ -34,7 +34,7 @@ struct dummy_init { dummy_init() { config = std::make_unique(); - local_schema_registry().init(*config); + local_schema_registry().init(db::schema_ctxt(*config,std::make_shared())); } }; From 44f38d4de21652058d6af5eb0a06104249784074 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 5 May 2022 08:44:26 +0300 Subject: [PATCH 3/4] schame_tables: Add db/ctxt args here and there This is to have them in places that call cql_type_parser::parse. Pure churn reduction for the next patch. Signed-off-by: Pavel Emelyanov --- db/schema_tables.cc | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 0cf7d7f5f7..e660660e3a 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -165,6 +165,7 @@ using computed_columns_map = std::unordered_map; static computed_columns_map get_computed_columns(const schema_mutations& sm); static std::vector create_columns_from_column_rows( + const schema_ctxt& ctxt, const query::result_set& rows, const sstring& keyspace, const sstring& table, bool is_super, column_view_virtual is_view_virtual, const computed_columns_map& computed_columns); @@ -1507,7 +1508,7 @@ static future merge_types(distributed read_arg_types(const query::result_set_row& row, const sstring& keyspace) { +static std::vector read_arg_types(replica::database& db, const query::result_set_row& row, const sstring& keyspace) { std::vector arg_types; for (const auto& arg : get_list(row, "argument_types")) { arg_types.push_back(db::cql_type_parser::parse(keyspace, arg)); @@ -1573,7 +1574,7 @@ static std::vector read_arg_types(const query::result_set_row& row, c static shared_ptr create_func(replica::database& db, const query::result_set_row& row) { cql3::functions::function_name name{ row.get_nonnull("keyspace_name"), row.get_nonnull("function_name")}; - auto arg_types = read_arg_types(row, name.keyspace); + auto arg_types = read_arg_types(db, row, name.keyspace); data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("return_type")); // FIXME: We already computed the bitcode in @@ -1611,7 +1612,7 @@ static shared_ptr create_func(replica::database& static shared_ptr create_aggregate(replica::database& db, const query::result_set_row& row) { cql3::functions::function_name name{ row.get_nonnull("keyspace_name"), row.get_nonnull("aggregate_name")}; - auto arg_types = read_arg_types(row, name.keyspace); + auto arg_types = read_arg_types(db, row, name.keyspace); data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("state_type")); sstring sfunc = row.get_nonnull("state_func"); auto ffunc = row.get("final_func"); @@ -2662,6 +2663,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations auto computed_columns = get_computed_columns(sm); std::vector column_defs = create_columns_from_column_rows( + ctxt, query::result_set(sm.columns_mutation()), ks_name, cf_name,/*, @@ -2843,7 +2845,8 @@ static computed_columns_map get_computed_columns(const schema_mutations& sm) { })); } -static std::vector create_columns_from_column_rows(const query::result_set& rows, +static std::vector create_columns_from_column_rows(const schema_ctxt& ctxt, + const query::result_set& rows, const sstring& keyspace, const sstring& table, /*, AbstractType rawComparator, */ @@ -2917,12 +2920,12 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm prepare_builder_from_table_row(ctxt, builder, row); auto computed_columns = get_computed_columns(sm); - auto column_defs = create_columns_from_column_rows(query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns); + auto column_defs = create_columns_from_column_rows(ctxt, query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns); for (auto&& cdef : column_defs) { builder.with_column_ordered(cdef); } if (sm.view_virtual_columns_mutation()) { - column_defs = create_columns_from_column_rows(query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns); + column_defs = create_columns_from_column_rows(ctxt, query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns); for (auto&& cdef : column_defs) { builder.with_column_ordered(cdef); } From 0f698910e80748f48efafdb6ac434b52b0bd9fc4 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 5 May 2022 08:45:45 +0300 Subject: [PATCH 4/4] cql_type_parser: Require user_types_storage& in parse() Right now to get user types the method in question gets global proxy instance to get database from it and then peek a keyspace, its metadata and, finally, the user types. There's also a safety check for proxy not being initialized, which happens in tests. Instead of messing with the proxy, the parse() method now accepts the user_types_storage reference from which it gets the types. All the callers already have the needed storage at hand -- in most of the cases it's one shared between the database and schema_ctxt. In case of tests is's a dummy storage, in case of schema-loader it's its local one. The get_column_mapping() is special -- it doesn't expect any user-types to be parsed and passes "" keyspace into it, neither it has db/ctxt to get types storage from, so it can safely use the dummy one. Signed-off-by: Pavel Emelyanov --- db/cql_type_parser.cc | 9 ++------- db/cql_type_parser.hh | 3 ++- db/schema_tables.cc | 12 ++++++------ db/schema_tables.hh | 4 ++++ replica/database.cc | 4 ++++ replica/database.hh | 1 + tools/schema_loader.cc | 2 +- 7 files changed, 20 insertions(+), 15 deletions(-) diff --git a/db/cql_type_parser.cc b/db/cql_type_parser.cc index ee1c92eea9..6a81cbfcfd 100644 --- a/db/cql_type_parser.cc +++ b/db/cql_type_parser.cc @@ -14,7 +14,6 @@ #include #include "replica/database.hh" -#include "service/storage_proxy.hh" #include "cql3/CqlParser.hpp" #include "cql3/util.hh" #include "cql_type_parser.hh" @@ -28,7 +27,7 @@ static ::shared_ptr parse_raw(const sstring& str) { }); } -data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str) { +data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str, const data_dictionary::user_types_storage& uts) { static const thread_local std::unordered_map native_types = []{ std::unordered_map res; for (auto& nt : cql3::cql3_type::values()) { @@ -42,12 +41,8 @@ data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str return i->second.get_type(); } - const auto& sp = service::get_storage_proxy(); - const replica::user_types_metadata& user_types = - sp.local_is_initialized() ? sp.local().get_db().local().find_keyspace(keyspace).metadata()->user_types() - : replica::user_types_metadata{}; auto raw = parse_raw(str); - return raw->prepare_internal(keyspace, user_types).get_type(); + return raw->prepare_internal(keyspace, uts.get(keyspace)).get_type(); } class db::cql_type_parser::raw_builder::impl { diff --git a/db/cql_type_parser.hh b/db/cql_type_parser.hh index 2b4d0fd8a8..a88a1e0c1a 100644 --- a/db/cql_type_parser.hh +++ b/db/cql_type_parser.hh @@ -24,12 +24,13 @@ class types_metadata; namespace data_dictionary { class keyspace_metadata; +class user_types_storage; } namespace db { namespace cql_type_parser { -data_type parse(const sstring& keyspace, const sstring& type); +data_type parse(const sstring& keyspace, const sstring& type, const data_dictionary::user_types_storage& uts); class raw_builder { public: diff --git a/db/schema_tables.cc b/db/schema_tables.cc index e660660e3a..f35cd1eb17 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -1511,7 +1511,7 @@ static future merge_types(distributed read_arg_types(replica::database& db, const query::result_set_row& row, const sstring& keyspace) { std::vector arg_types; for (const auto& arg : get_list(row, "argument_types")) { - arg_types.push_back(db::cql_type_parser::parse(keyspace, arg)); + arg_types.push_back(db::cql_type_parser::parse(keyspace, arg, db.user_types())); } return arg_types; } @@ -1575,7 +1575,7 @@ static shared_ptr create_func(replica::database& cql3::functions::function_name name{ row.get_nonnull("keyspace_name"), row.get_nonnull("function_name")}; auto arg_types = read_arg_types(db, row, name.keyspace); - data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("return_type")); + data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("return_type"), db.user_types()); // FIXME: We already computed the bitcode in // create_function_statement, but it is not clear how to get it @@ -1613,7 +1613,7 @@ static shared_ptr create_aggregate(replica::dat cql3::functions::function_name name{ row.get_nonnull("keyspace_name"), row.get_nonnull("aggregate_name")}; auto arg_types = read_arg_types(db, row, name.keyspace); - data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("state_type")); + data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull("state_type"), db.user_types()); sstring sfunc = row.get_nonnull("state_func"); auto ffunc = row.get("final_func"); auto initcond_str = row.get("initcond"); @@ -2694,7 +2694,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations query::result_set dcr(*sm.dropped_columns_mutation()); for (auto& row : dcr.rows()) { auto name = row.get_nonnull("column_name"); - auto type = cql_type_parser::parse(ks_name, row.get_nonnull("type")); + auto type = cql_type_parser::parse(ks_name, row.get_nonnull("type"), ctxt.user_types()); auto time = row.get_nonnull("dropped_time"); builder.without_column(name, type, time.time_since_epoch().count()); } @@ -2857,7 +2857,7 @@ static std::vector create_columns_from_column_rows(const sche std::vector columns; for (auto&& row : rows.rows()) { auto kind = deserialize_kind(row.get_nonnull("kind")); - auto type = cql_type_parser::parse(keyspace, row.get_nonnull("type")); + auto type = cql_type_parser::parse(keyspace, row.get_nonnull("type"), ctxt.user_types()); auto name_bytes = row.get_nonnull("column_name_bytes"); column_id position = row.get_nonnull("position"); @@ -3319,7 +3319,7 @@ future get_column_mapping(utils::UUID table_id, table_schema_ver std::vector static_columns, regular_columns; for (const auto& row : *results) { auto kind = deserialize_kind(row.get_as("kind")); - auto type = cql_type_parser::parse("" /*unused*/, row.get_as("type")); + auto type = cql_type_parser::parse("" /*unused*/, row.get_as("type"), data_dictionary::dummy_user_types_storage()); auto name_bytes = row.get_blob("column_name_bytes"); column_id position = row.get_as("position"); diff --git a/db/schema_tables.hh b/db/schema_tables.hh index aa77f24ea2..89fd202472 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -85,6 +85,10 @@ public: return _schema_registry_grace_period; } + const data_dictionary::user_types_storage& user_types() const noexcept { + return *_user_types; + } + private: const db::extensions& _extensions; const unsigned _murmur3_partitioner_ignore_msb_bits; diff --git a/replica/database.cc b/replica/database.cc index edaddf4109..f3b41a69e1 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -399,6 +399,10 @@ std::shared_ptr database::as_user_types_sto return _user_types; } +const data_dictionary::user_types_storage& database::user_types() const noexcept { + return *_user_types; +} + } // namespace replica void backlog_controller::adjust() { diff --git a/replica/database.hh b/replica/database.hh index 74242d1419..31bdf79758 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1331,6 +1331,7 @@ private: public: data_dictionary::database as_data_dictionary() const; std::shared_ptr as_user_types_storage() const noexcept; + const data_dictionary::user_types_storage& user_types() const noexcept; future<> init_commitlog(); const gms::feature_service& features() const { return _feat; } future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout); diff --git a/tools/schema_loader.cc b/tools/schema_loader.cc index 6f60ff71ff..fc306648b7 100644 --- a/tools/schema_loader.cc +++ b/tools/schema_loader.cc @@ -284,7 +284,7 @@ std::vector do_load_schemas(std::string_view schema_str) { db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, keyspace_name, table_name)); } auto name = row.get_nonnull("column_name"); - auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull("type")); + auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull("type"), user_types_storage(real_db)); auto time = row.get_nonnull("dropped_time"); *it = schema_builder(*it).without_column(std::move(name), std::move(type), time.time_since_epoch().count()).build(); }