Merge "Add user types carrier helper" from Pavel Emelyanov

"
There's a cql_type_parser::parse() method that needs to get user
types for a keyspace by its name. For this it uses the global
storage proxy instance as a place to get database from. This set
introduces an abstract user_types_storage helper object that's
responsible in providing the user types for the caller.

This helper, in turn, is provided to the parse() method by the
database itself or by the schema_ctxt object that needs parse()
to unfreeze schemas and doesn't have database at those times.

This removes one more get_storage_proxy() call.
"

* 'br-user-types-storage' of https://github.com/xemul/scylla:
  cql_type_parser: Require user_types_storage& in parse()
  schame_tables: Add db/ctxt args here and there
  user_types: Carry storage on database and schema_ctxt
  data_dictionary: Introduce user types storage
This commit is contained in:
Botond Dénes
2022-05-09 17:38:52 +03:00
9 changed files with 92 additions and 25 deletions

View File

@@ -36,4 +36,17 @@ public:
friend std::ostream& operator<<(std::ostream& os, const user_types_metadata& m);
};
class user_types_storage {
public:
virtual const user_types_metadata& get(const sstring& ks) const = 0;
};
class dummy_user_types_storage : public user_types_storage {
user_types_metadata _empty;
public:
virtual const user_types_metadata& get(const sstring& ks) const override {
return _empty;
}
};
}

View File

@@ -14,7 +14,6 @@
#include <boost/range/adaptor/sliced.hpp>
#include "replica/database.hh"
#include "service/storage_proxy.hh"
#include "cql3/CqlParser.hpp"
#include "cql3/util.hh"
#include "cql_type_parser.hh"
@@ -28,7 +27,7 @@ static ::shared_ptr<cql3::cql3_type::raw> parse_raw(const sstring& str) {
});
}
data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str) {
data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str, const data_dictionary::user_types_storage& uts) {
static const thread_local std::unordered_map<sstring, cql3::cql3_type> native_types = []{
std::unordered_map<sstring, cql3::cql3_type> res;
for (auto& nt : cql3::cql3_type::values()) {
@@ -42,12 +41,8 @@ data_type db::cql_type_parser::parse(const sstring& keyspace, const sstring& str
return i->second.get_type();
}
const auto& sp = service::get_storage_proxy();
const replica::user_types_metadata& user_types =
sp.local_is_initialized() ? sp.local().get_db().local().find_keyspace(keyspace).metadata()->user_types()
: replica::user_types_metadata{};
auto raw = parse_raw(str);
return raw->prepare_internal(keyspace, user_types).get_type();
return raw->prepare_internal(keyspace, uts.get(keyspace)).get_type();
}
class db::cql_type_parser::raw_builder::impl {

View File

@@ -24,12 +24,13 @@ class types_metadata;
namespace data_dictionary {
class keyspace_metadata;
class user_types_storage;
}
namespace db {
namespace cql_type_parser {
data_type parse(const sstring& keyspace, const sstring& type);
data_type parse(const sstring& keyspace, const sstring& type, const data_dictionary::user_types_storage& uts);
class raw_builder {
public:

View File

@@ -91,14 +91,15 @@ static bool is_extra_durable(const sstring& ks_name, const sstring& cf_name) {
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
namespace db {
schema_ctxt::schema_ctxt(const db::config& cfg)
schema_ctxt::schema_ctxt(const db::config& cfg, std::shared_ptr<data_dictionary::user_types_storage> uts)
: _extensions(cfg.extensions())
, _murmur3_partitioner_ignore_msb_bits(cfg.murmur3_partitioner_ignore_msb_bits())
, _schema_registry_grace_period(cfg.schema_registry_grace_period())
, _user_types(std::move(uts))
{}
schema_ctxt::schema_ctxt(const replica::database& db)
: schema_ctxt(db.get_config())
: schema_ctxt(db.get_config(), db.as_user_types_storage())
{}
schema_ctxt::schema_ctxt(distributed<replica::database>& db)
@@ -164,6 +165,7 @@ using computed_columns_map = std::unordered_map<bytes, column_computation_ptr>;
static computed_columns_map get_computed_columns(const schema_mutations& sm);
static std::vector<column_definition> create_columns_from_column_rows(
const schema_ctxt& ctxt,
const query::result_set& rows, const sstring& keyspace,
const sstring& table, bool is_super, column_view_virtual is_view_virtual, const computed_columns_map& computed_columns);
@@ -1506,10 +1508,10 @@ static future<user_types_to_drop> merge_types(distributed<service::storage_proxy
}};
}
static std::vector<data_type> read_arg_types(const query::result_set_row& row, const sstring& keyspace) {
static std::vector<data_type> read_arg_types(replica::database& db, const query::result_set_row& row, const sstring& keyspace) {
std::vector<data_type> arg_types;
for (const auto& arg : get_list<sstring>(row, "argument_types")) {
arg_types.push_back(db::cql_type_parser::parse(keyspace, arg));
arg_types.push_back(db::cql_type_parser::parse(keyspace, arg, db.user_types()));
}
return arg_types;
}
@@ -1572,8 +1574,8 @@ static std::vector<data_type> read_arg_types(const query::result_set_row& row, c
static shared_ptr<cql3::functions::user_function> create_func(replica::database& db, const query::result_set_row& row) {
cql3::functions::function_name name{
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("function_name")};
auto arg_types = read_arg_types(row, name.keyspace);
data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull<sstring>("return_type"));
auto arg_types = read_arg_types(db, row, name.keyspace);
data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull<sstring>("return_type"), db.user_types());
// FIXME: We already computed the bitcode in
// create_function_statement, but it is not clear how to get it
@@ -1610,8 +1612,8 @@ static shared_ptr<cql3::functions::user_function> create_func(replica::database&
static shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row) {
cql3::functions::function_name name{
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("aggregate_name")};
auto arg_types = read_arg_types(row, name.keyspace);
data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull<sstring>("state_type"));
auto arg_types = read_arg_types(db, row, name.keyspace);
data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull<sstring>("state_type"), db.user_types());
sstring sfunc = row.get_nonnull<sstring>("state_func");
auto ffunc = row.get<sstring>("final_func");
auto initcond_str = row.get<sstring>("initcond");
@@ -2661,6 +2663,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
auto computed_columns = get_computed_columns(sm);
std::vector<column_definition> column_defs = create_columns_from_column_rows(
ctxt,
query::result_set(sm.columns_mutation()),
ks_name,
cf_name,/*,
@@ -2691,7 +2694,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
query::result_set dcr(*sm.dropped_columns_mutation());
for (auto& row : dcr.rows()) {
auto name = row.get_nonnull<sstring>("column_name");
auto type = cql_type_parser::parse(ks_name, row.get_nonnull<sstring>("type"));
auto type = cql_type_parser::parse(ks_name, row.get_nonnull<sstring>("type"), ctxt.user_types());
auto time = row.get_nonnull<db_clock::time_point>("dropped_time");
builder.without_column(name, type, time.time_since_epoch().count());
}
@@ -2842,7 +2845,8 @@ static computed_columns_map get_computed_columns(const schema_mutations& sm) {
}));
}
static std::vector<column_definition> create_columns_from_column_rows(const query::result_set& rows,
static std::vector<column_definition> create_columns_from_column_rows(const schema_ctxt& ctxt,
const query::result_set& rows,
const sstring& keyspace,
const sstring& table, /*,
AbstractType<?> rawComparator, */
@@ -2853,7 +2857,7 @@ static std::vector<column_definition> create_columns_from_column_rows(const quer
std::vector<column_definition> columns;
for (auto&& row : rows.rows()) {
auto kind = deserialize_kind(row.get_nonnull<sstring>("kind"));
auto type = cql_type_parser::parse(keyspace, row.get_nonnull<sstring>("type"));
auto type = cql_type_parser::parse(keyspace, row.get_nonnull<sstring>("type"), ctxt.user_types());
auto name_bytes = row.get_nonnull<bytes>("column_name_bytes");
column_id position = row.get_nonnull<int32_t>("position");
@@ -2916,12 +2920,12 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm
prepare_builder_from_table_row(ctxt, builder, row);
auto computed_columns = get_computed_columns(sm);
auto column_defs = create_columns_from_column_rows(query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns);
auto column_defs = create_columns_from_column_rows(ctxt, query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns);
for (auto&& cdef : column_defs) {
builder.with_column_ordered(cdef);
}
if (sm.view_virtual_columns_mutation()) {
column_defs = create_columns_from_column_rows(query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns);
column_defs = create_columns_from_column_rows(ctxt, query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns);
for (auto&& cdef : column_defs) {
builder.with_column_ordered(cdef);
}
@@ -3315,7 +3319,7 @@ future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_ver
std::vector<column_definition> static_columns, regular_columns;
for (const auto& row : *results) {
auto kind = deserialize_kind(row.get_as<sstring>("kind"));
auto type = cql_type_parser::parse("" /*unused*/, row.get_as<sstring>("type"));
auto type = cql_type_parser::parse("" /*unused*/, row.get_as<sstring>("type"), data_dictionary::dummy_user_types_storage());
auto name_bytes = row.get_blob("column_name_bytes");
column_id position = row.get_as<int32_t>("position");

View File

@@ -27,6 +27,7 @@
namespace data_dictionary {
class keyspace_metadata;
class user_types_storage;
}
using keyspace_metadata = data_dictionary::keyspace_metadata;
@@ -67,7 +68,7 @@ class config;
class schema_ctxt {
public:
schema_ctxt(const config&);
schema_ctxt(const config&, std::shared_ptr<data_dictionary::user_types_storage> uts);
schema_ctxt(const replica::database&);
schema_ctxt(distributed<replica::database>&);
schema_ctxt(distributed<service::storage_proxy>&);
@@ -84,10 +85,15 @@ public:
return _schema_registry_grace_period;
}
const data_dictionary::user_types_storage& user_types() const noexcept {
return *_user_types;
}
private:
const db::extensions& _extensions;
const unsigned _murmur3_partitioner_ignore_msb_bits;
const uint32_t _schema_registry_grace_period;
const std::shared_ptr<data_dictionary::user_types_storage> _user_types;
};
namespace schema_tables {

View File

@@ -303,9 +303,28 @@ void database::setup_scylla_memory_diagnostics_producer() {
});
}
class db_user_types_storage : public data_dictionary::dummy_user_types_storage {
const replica::database* _db = nullptr;
public:
db_user_types_storage(const database& db) noexcept : _db(&db) {}
virtual const user_types_metadata& get(const sstring& ks) const override {
if (_db == nullptr) {
return dummy_user_types_storage::get(ks);
}
return _db->find_keyspace(ks).metadata()->user_types();
}
void deactivate() noexcept {
_db = nullptr;
}
};
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
abort_source& as, sharded<semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier)
: _stats(make_lw_shared<db_stats>())
, _user_types(std::make_shared<db_user_types_storage>(*this))
, _cl_stats(std::make_unique<cell_locker_stats>())
, _cfg(cfg)
// Allow system tables a pool of 10 MB memory to write, but never block on other regions.
@@ -376,6 +395,14 @@ const db::extensions& database::extensions() const {
return get_config().extensions();
}
std::shared_ptr<data_dictionary::user_types_storage> database::as_user_types_storage() const noexcept {
return _user_types;
}
const data_dictionary::user_types_storage& database::user_types() const noexcept {
return *_user_types;
}
} // namespace replica
void backlog_controller::adjust() {
@@ -713,6 +740,7 @@ void database::set_format(sstables::sstable_version_types format) noexcept {
}
database::~database() {
_user_types->deactivate();
}
void database::update_version(const utils::UUID& version) {

View File

@@ -1199,6 +1199,8 @@ struct string_pair_eq {
bool operator()(spair lhs, spair rhs) const;
};
class db_user_types_storage;
// Policy for distributed<database>:
// broadcast metadata writes
// local metadata reads
@@ -1253,6 +1255,7 @@ private:
};
lw_shared_ptr<db_stats> _stats;
std::shared_ptr<db_user_types_storage> _user_types;
std::unique_ptr<cell_locker_stats> _cl_stats;
const db::config& _cfg;
@@ -1320,6 +1323,8 @@ private:
public:
data_dictionary::database as_data_dictionary() const;
std::shared_ptr<data_dictionary::user_types_storage> as_user_types_storage() const noexcept;
const data_dictionary::user_types_storage& user_types() const noexcept;
future<> init_commitlog();
const gms::feature_service& features() const { return _feat; }
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout);

View File

@@ -34,7 +34,7 @@ struct dummy_init {
dummy_init() {
config = std::make_unique<db::config>();
local_schema_registry().init(*config);
local_schema_registry().init(db::schema_ctxt(*config,std::make_shared<data_dictionary::dummy_user_types_storage>()));
}
};

View File

@@ -160,6 +160,21 @@ private:
}
};
class user_types_storage : public data_dictionary::user_types_storage {
database& _db;
public:
user_types_storage(database& db) noexcept : _db(db) {}
virtual const data_dictionary::user_types_metadata& get(const sstring& name) const override {
for (const auto& ks : _db.keyspaces) {
if (ks.metadata->name() == name) {
return ks.metadata->user_types();
}
}
throw data_dictionary::no_such_keyspace(name);
}
};
table::table(data_dictionary_impl& impl, keyspace& ks, schema_ptr schema) :
ks(ks), schema(std::move(schema)), secondary_idx_man(impl.wrap(*this))
{ }
@@ -269,7 +284,7 @@ std::vector<schema_ptr> do_load_schemas(std::string_view schema_str) {
db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, keyspace_name, table_name));
}
auto name = row.get_nonnull<sstring>("column_name");
auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull<sstring>("type"));
auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull<sstring>("type"), user_types_storage(real_db));
auto time = row.get_nonnull<db_clock::time_point>("dropped_time");
*it = schema_builder(*it).without_column(std::move(name), std::move(type), time.time_since_epoch().count()).build();
}