Files
scylladb/tools/schema_loader.cc
Michael Litvak 06343b58a2 cql3: add warning when creating mv/index with tablets about rf-rack
Creating a MV or index in a tablets-based keyspace now forces additional
restrictions on the keyspace. The keyspace must be RF-rack-valid and it
must remain RF-rack-valid while the view exists.

Add a CQL warning about these restrictions.
2025-12-22 09:21:06 +01:00

692 lines
33 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/assert.hh"
#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <fmt/ranges.h>
#include <seastar/core/fstream.hh>
#include <seastar/util/closeable.hh>
#include <seastar/util/short_streams.hh>
#include "cdc/cdc_partitioner.hh"
#include "cdc/log.hh"
#include "cdc/cdc_options.hh"
#include "cql3/query_processor.hh"
#include "cql3/statements/create_keyspace_statement.hh"
#include "cql3/statements/create_table_statement.hh"
#include "cql3/statements/create_type_statement.hh"
#include "cql3/statements/create_view_statement.hh"
#include "cql3/statements/create_index_statement.hh"
#include "cql3/statements/update_statement.hh"
#include "db/cql_type_parser.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/schema_tables.hh"
#include "db/system_keyspace.hh"
#include "partition_slice_builder.hh"
#include "readers/combined.hh"
#include "replica/database.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_directory.hh"
#include "types/list.hh"
#include "data_dictionary/impl.hh"
#include "data_dictionary/data_dictionary.hh"
#include "gms/feature_service.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/local_strategy.hh"
#include "tools/schema_loader.hh"
#include "tools/read_mutation.hh"
#include "view_info.hh"
namespace {
logging::logger sllog("schema_loader");
class data_dictionary_impl;
struct keyspace;
struct table;
struct database {
const db::config& cfg;
gms::feature_service& features;
std::list<keyspace> keyspaces;
std::list<table> tables;
database(const db::config& cfg, gms::feature_service& features) : cfg(cfg), features(features)
{ }
database(database&&) = delete;
};
struct keyspace {
lw_shared_ptr<keyspace_metadata> metadata;
explicit keyspace(lw_shared_ptr<keyspace_metadata> metadata) : metadata(std::move(metadata))
{ }
keyspace(keyspace&&) = delete;
};
struct table {
const keyspace& ks;
schema_ptr schema;
secondary_index::secondary_index_manager secondary_idx_man;
bool user;
table(data_dictionary_impl& impl, const keyspace& ks, schema_ptr schema, bool user);
table(table&&) = delete;
};
class data_dictionary_impl : public data_dictionary::impl {
public:
const data_dictionary::database wrap(const database& db) const {
return make_database(this, &db);
}
data_dictionary::keyspace wrap(const keyspace& ks) const {
return make_keyspace(this, &ks);
}
data_dictionary::table wrap(const table& t) const {
return make_table(this, &t);
}
static const database& unwrap(data_dictionary::database db) {
return *static_cast<const database*>(extract(db));
}
static const keyspace& unwrap(data_dictionary::keyspace ks) {
return *static_cast<const keyspace*>(extract(ks));
}
static const table& unwrap(data_dictionary::table t) {
return *static_cast<const table*>(extract(t));
}
private:
virtual const table_schema_version& get_version(data_dictionary::database) const override {
throw std::bad_function_call();
}
virtual std::optional<data_dictionary::keyspace> try_find_keyspace(data_dictionary::database db, std::string_view name) const override {
auto& keyspaces = unwrap(db).keyspaces;
auto it = std::find_if(keyspaces.begin(), keyspaces.end(), [name] (const keyspace& ks) { return ks.metadata->name() == name; });
if (it == keyspaces.end()) {
return {};
}
return wrap(*it);
}
virtual std::vector<data_dictionary::keyspace> get_keyspaces(data_dictionary::database db) const override {
return unwrap(db).keyspaces | std::views::transform([this] (const keyspace& ks) { return wrap(ks); }) | std::ranges::to<std::vector<data_dictionary::keyspace>>();
}
virtual std::vector<sstring> get_user_keyspaces(data_dictionary::database db) const override {
return std::ranges::to<std::vector<sstring>>(
unwrap(db).keyspaces
| std::views::transform([] (const keyspace& ks) { return ks.metadata->name(); })
| std::views::filter([] (const sstring& ks) {return !is_internal_keyspace(ks); })
);
}
virtual std::vector<sstring> get_all_keyspaces(data_dictionary::database db) const override {
return unwrap(db).keyspaces | std::views::transform([] (const keyspace& ks) { return ks.metadata->name(); }) | std::ranges::to<std::vector<sstring>>();
}
virtual std::vector<data_dictionary::table> get_tables(data_dictionary::database db) const override {
return unwrap(db).tables | std::views::transform([this] (const table& ks) { return wrap(ks); }) | std::ranges::to<std::vector<data_dictionary::table>>();
}
virtual std::optional<data_dictionary::table> try_find_table(data_dictionary::database db, std::string_view ks, std::string_view tab) const override {
auto& tables = unwrap(db).tables;
auto it = std::find_if(tables.begin(), tables.end(), [ks, tab] (const table& tbl) { return tbl.schema->ks_name() == ks && tbl.schema->cf_name() == tab; });
if (it == tables.end()) {
return {};
}
return wrap(*it);
}
virtual std::optional<data_dictionary::table> try_find_table(data_dictionary::database db, table_id id) const override {
auto& tables = unwrap(db).tables;
auto it = std::find_if(tables.begin(), tables.end(), [id] (const table& tbl) { return tbl.schema->id() == id; });
if (it == tables.end()) {
return {};
}
return wrap(*it);
}
virtual const secondary_index::secondary_index_manager& get_index_manager(data_dictionary::table t) const override {
return unwrap(t).secondary_idx_man;
}
virtual schema_ptr get_table_schema(data_dictionary::table t) const override {
return unwrap(t).schema;
}
virtual db_clock::time_point get_truncation_time(data_dictionary::table t) const override {
return {};
}
virtual lw_shared_ptr<keyspace_metadata> get_keyspace_metadata(data_dictionary::keyspace ks) const override {
return unwrap(ks).metadata;
}
virtual bool is_internal(data_dictionary::keyspace ks) const override {
return is_system_keyspace(unwrap(ks).metadata->name());
}
virtual const locator::abstract_replication_strategy& get_replication_strategy(data_dictionary::keyspace ks) const override {
static const locator::local_strategy strategy{locator::replication_strategy_params{locator::replication_strategy_config_options{}, {}, data_dictionary::consistency_config_option::eventual}, nullptr};
return strategy;
}
virtual const std::vector<view_ptr>& get_table_views(data_dictionary::table t) const override {
static const std::vector<view_ptr> empty;
return empty;
}
virtual sstring get_available_index_name(data_dictionary::database db, std::string_view ks_name, std::string_view cf_name,
std::optional<sstring> index_name_root) const override {
auto has_schema = [&] (std::string_view ks_name, std::string_view table_name) {
const auto& tables = unwrap(db).tables;
return std::find_if(tables.begin(), tables.end(), [&] (const table& t) {
return t.schema->ks_name() == ks_name && t.schema->cf_name() == table_name;
}) != tables.end();
};
return secondary_index::get_available_index_name(ks_name, cf_name, index_name_root, existing_index_names(db, ks_name), has_schema);
}
virtual std::set<sstring> existing_index_names(data_dictionary::database db, std::string_view ks_name, std::string_view cf_to_exclude = {}) const override {
auto tables = std::ranges::to<std::vector<schema_ptr>>(unwrap(db).tables
| std::views::filter([ks_name] (const table& t) { return t.schema->ks_name() == ks_name; })
| std::views::transform([] (const table& t) { return t.schema; }));
return secondary_index::existing_index_names(tables, cf_to_exclude);
}
virtual schema_ptr find_indexed_table(data_dictionary::database db, std::string_view ks_name, std::string_view index_name) const override {
return {};
}
virtual schema_ptr get_cdc_base_table(data_dictionary::database db, const schema&) const override {
return {};
}
virtual const db::config& get_config(data_dictionary::database db) const override {
return unwrap(db).cfg;
}
virtual const db::extensions& get_extensions(data_dictionary::database db) const override {
return get_config(db).extensions();
}
virtual const gms::feature_service& get_features(data_dictionary::database db) const override {
return unwrap(db).features;
}
virtual replica::database& real_database(data_dictionary::database db) const override {
throw std::bad_function_call();
}
virtual replica::database* real_database_ptr(data_dictionary::database db) const override {
return nullptr;
}
};
class user_types_storage : public data_dictionary::user_types_storage {
database& _db;
public:
user_types_storage(database& db) noexcept : _db(db) {}
virtual const data_dictionary::user_types_metadata& get(const sstring& name) const override {
for (const auto& ks : _db.keyspaces) {
if (ks.metadata->name() == name) {
return ks.metadata->user_types();
}
}
throw data_dictionary::no_such_keyspace(name);
}
};
table::table(data_dictionary_impl& impl, const keyspace& ks, schema_ptr schema, bool user) :
ks(ks), schema(std::move(schema)), secondary_idx_man(impl.wrap(*this)), user(user)
{ }
sstring read_file(std::filesystem::path path) {
auto file = open_file_dma(path.native(), open_flags::ro).get();
auto fstream = make_file_input_stream(file);
return util::read_entire_stream_contiguous(fstream).get();
}
std::vector<schema_ptr> do_load_schemas(const db::config& cfg, std::string_view schema_str) {
cql3::cql_stats cql_stats;
gms::feature_service feature_service({get_disabled_features_from_db_config(cfg)});
feature_service.enable(feature_service.supported_feature_set()).get();
feature_service.views_with_tablets.enable();
sharded<locator::shared_token_metadata> token_metadata;
auto my_address = gms::inet_address("localhost");
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.this_cql_address = my_address;
tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location;
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_token_metadata = deferred_stop(token_metadata);
data_dictionary_impl dd_impl;
database real_db(cfg, feature_service);
auto db = dd_impl.wrap(real_db);
// Mock system_schema keyspace to be able to parse modification statements
// against system_schema.dropped_columns.
real_db.keyspaces.emplace_back(make_lw_shared<keyspace_metadata>(
db::schema_tables::NAME,
"org.apache.cassandra.locator.LocalStrategy",
locator::replication_strategy_config_options{},
std::nullopt,
std::nullopt,
false));
real_db.tables.emplace_back(dd_impl, real_db.keyspaces.back(), db::schema_tables::dropped_columns(), false);
auto find_or_create_keyspace = [&] (const sstring& name) -> data_dictionary::keyspace {
try {
return db.find_keyspace(name);
} catch (replica::no_such_keyspace&) {
// fall-though to below
}
auto raw_statement = cql3::query_processor::parse_statement(
fmt::format("CREATE KEYSPACE {} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}", name),
cql3::dialect{});
auto prepared_statement = raw_statement->prepare(db, cql_stats);
auto* statement = prepared_statement->statement.get();
auto p = dynamic_cast<cql3::statements::create_keyspace_statement*>(statement);
SCYLLA_ASSERT(p);
real_db.keyspaces.emplace_back(p->get_keyspace_metadata(*token_metadata.local().get(), feature_service, cfg));
return db.find_keyspace(name);
};
std::vector<std::unique_ptr<cql3::statements::raw::parsed_statement>> raw_statements;
try {
raw_statements = cql3::query_processor::parse_statements(schema_str, cql3::dialect{});
} catch (...) {
throw std::runtime_error(format("tools:do_load_schemas(): failed to parse CQL statements: {}", std::current_exception()));
}
for (auto& raw_statement : raw_statements) {
auto cf_statement = dynamic_cast<cql3::statements::raw::cf_statement*>(raw_statement.get());
if (!cf_statement) {
continue; // we don't support any non-cf statements here
}
if (!cf_statement->has_keyspace()) {
throw std::runtime_error("tools::do_load_schemas(): CQL statement does not have keyspace specified");
}
auto ks = find_or_create_keyspace(cf_statement->keyspace());
auto prepared_statement = cf_statement->prepare(db, cql_stats);
auto* statement = prepared_statement->statement.get();
if (auto p = dynamic_cast<cql3::statements::create_keyspace_statement*>(statement)) {
real_db.keyspaces.emplace_back(p->get_keyspace_metadata(*token_metadata.local().get(), feature_service, cfg));
} else if (auto p = dynamic_cast<cql3::statements::create_type_statement*>(statement)) {
dd_impl.unwrap(ks).metadata->add_user_type(p->create_type(db));
} else if (auto p = dynamic_cast<cql3::statements::create_table_statement*>(statement)) {
auto schema = p->get_cf_meta_data(db);
// CDC tables use a custom partitioner, which is not reflected when
// dumping the schema to schema.cql, so we have to manually set it here.
if (cdc::is_log_name(schema->cf_name())) {
schema_builder b(std::move(schema));
b.with_partitioner(cdc::cdc_partitioner::classname);
schema = b.build();
}
real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), std::move(schema), true);
} else if (auto p = dynamic_cast<cql3::statements::create_view_statement*>(statement)) {
auto&& [view, warnings] = p->prepare_view(db, token_metadata.local().get());
auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) { return t.schema->ks_name() == view->ks_name() && t.schema->cf_name() == view->cf_name(); });
if (it != real_db.tables.end()) {
continue; // view already exists
}
real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), view, true);
} else if (auto p = dynamic_cast<cql3::statements::create_index_statement*>(statement)) {
auto [res, warnings] = p->build_index_schema(db, token_metadata.local().get());
if (!res) {
continue; // index already exists
}
auto [new_base_schema, index] = *res;
auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) { return t.schema->id() == new_base_schema->id(); });
if (it == real_db.tables.end()) { // shouldn't happen but let's handle it
throw std::runtime_error(fmt::format("tools::do_load_schemas(): failed to look up base table {}.{}, while creating index on it", new_base_schema->ks_name(), new_base_schema->cf_name()));
}
it->schema = std::move(new_base_schema);
it->secondary_idx_man.reload();
auto view = p->create_view_for_index(it->schema, index, db);
real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), view, true);
} else if (auto p = dynamic_cast<cql3::statements::update_statement*>(statement)) {
if (p->keyspace() != db::schema_tables::NAME && p->column_family() != db::schema_tables::DROPPED_COLUMNS) {
throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected modification statement to be against {}.{}, but it is against {}.{}",
db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, p->keyspace(), p->column_family()));
}
auto schema = db::schema_tables::dropped_columns();
cql3::statements::modification_statement::json_cache_opt json_cache{};
cql3::update_parameters params(schema, cql3::query_options::DEFAULT, api::new_timestamp(), schema->default_time_to_live(), cql3::update_parameters::prefetch_data(schema));
auto pkeys = p->build_partition_keys(cql3::query_options::DEFAULT, json_cache);
auto ckranges = p->create_clustering_ranges(cql3::query_options::DEFAULT, json_cache);
auto updates = p->apply_updates(pkeys, ckranges, params, json_cache);
if (updates.size() != 1) {
throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected one update per statement for {}.{}, got: {}",
db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, updates.size()));
}
auto& mut = updates.front();
if (!mut.partition().row_tombstones().empty()) {
throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected only update against {}.{}, not deletes",
db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS));
}
query::result_set rs(mut);
for (auto& row : rs.rows()) {
const auto keyspace_name = row.get_nonnull<sstring>("keyspace_name");
const auto table_name = row.get_nonnull<sstring>("table_name");
auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) {
auto& s = t.schema;
return s->ks_name() == keyspace_name && s->cf_name() == table_name;
});
if (it == real_db.tables.end()) {
throw std::runtime_error(fmt::format("tools::do_load_schemas(): failed applying update to {}.{}, the table it applies to is not found: {}.{}",
db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, keyspace_name, table_name));
}
auto name = row.get_nonnull<sstring>("column_name");
auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull<sstring>("type"), user_types_storage(real_db));
auto time = row.get_nonnull<db_clock::time_point>("dropped_time");
it->schema = schema_builder(std::move(it->schema)).without_column(std::move(name), std::move(type), time.time_since_epoch().count()).build();
}
} else {
throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected statement to be one of (create keyspace, create type, create table), got: {}",
typeid(statement).name()));
}
}
return std::ranges::to<std::vector<schema_ptr>>(
real_db.tables |
std::views::filter([] (const table& t) { return t.user; }) |
std::views::transform([] (const table& t) { return t.schema; }));
}
class single_keyspace_user_types_storage : public data_dictionary::user_types_storage {
data_dictionary::user_types_metadata _utm;
public:
single_keyspace_user_types_storage(data_dictionary::user_types_metadata utm) : _utm(std::move(utm)) { }
virtual const data_dictionary::user_types_metadata& get(const sstring& ks) const override {
return _utm;
}
};
schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) {
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__, reader_concurrency_semaphore::register_metrics::no);
auto stop_semaphore = deferred_stop(rcs_sem);
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<sstable_manager_service> sst_man;
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
auto stop_sst_man_service = deferred_stop(sst_man);
auto schema_tables_path = scylla_data_path / db::schema_tables::NAME;
auto empty = [] (const mutation_opt& mopt) {
return !mopt || !mopt->partition().row_count();
};
auto do_load = [&] (std::function<const schema_ptr()> schema_factory) {
auto s = schema_factory();
return read_mutation_from_table_offline(
sst_man,
rcs_sem.make_tracking_only_permit(s, "schema_mutation", db::no_timeout, {}),
get_table_directory(scylla_data_path, s->ks_name(), s->cf_name()).get(),
keyspace,
schema_factory,
data_value(keyspace),
data_value(table));
};
mutation_opt tables = do_load(db::schema_tables::tables);
mutation_opt views = do_load(db::schema_tables::views);
mutation_opt columns = do_load(db::schema_tables::columns);
mutation_opt view_virtual_columns = do_load(db::schema_tables::view_virtual_columns);
mutation_opt computed_columns = do_load(db::schema_tables::computed_columns);
mutation_opt indexes = do_load(db::schema_tables::indexes);
mutation_opt dropped_columns = do_load(db::schema_tables::dropped_columns);
mutation_opt scylla_tables = do_load([] () { return db::schema_tables::scylla_tables(); });
if ((empty(tables) && empty(views)) || empty(columns)) {
throw std::runtime_error(fmt::format("Failed to find {}.{} in schema tables", keyspace, table));
}
data_dictionary::user_types_metadata utm;
auto types_schema = db::schema_tables::types();
auto types_mut = read_mutation_from_table_offline(
sst_man,
rcs_sem.make_tracking_only_permit(db::schema_tables::types(), "types_mutation", db::no_timeout, {}),
get_table_directory(scylla_data_path, types_schema->ks_name(), types_schema->cf_name()).get(),
keyspace,
db::schema_tables::types,
data_value(keyspace),
{});
if (types_mut) {
query::result_set result(*types_mut);
auto ks = make_lw_shared<keyspace_metadata>(keyspace, "org.apache.cassandra.locator.LocalStrategy", locator::replication_strategy_config_options{}, std::nullopt, std::nullopt, false);
db::cql_type_parser::raw_builder ut_builder(*ks);
auto get_list = [] (const query::result_set_row& row, const char* name) {
return row.get_nonnull<const list_type_impl::native_type&>(name)
| std::views::transform([] (const data_value& v) { return value_cast<sstring>(v); })
| std::ranges::to<std::vector<sstring>>();
};
for (const auto& row : result.rows()) {
const auto name = row.get_nonnull<sstring>("type_name");
const auto field_names = get_list(row, "field_names");
const auto field_types = get_list(row, "field_types");
ut_builder.add(name, field_names, field_types);
}
auto user_types = ut_builder.build().get();
for (auto&& ut : user_types) {
utm.add_type(std::move(ut));
}
}
auto user_type_storage = std::make_shared<single_keyspace_user_types_storage>(std::move(utm));
gms::feature_service features({get_disabled_features_from_db_config(dbcfg)});
db::schema_ctxt ctxt(dbcfg, user_type_storage, features);
if (empty(tables)) {
tables = std::move(views);
}
schema_mutations muts(std::move(*tables), std::move(*columns), std::move(view_virtual_columns), std::move(computed_columns), std::move(indexes),
std::move(dropped_columns), std::move(scylla_tables));
if (muts.is_view()) {
query::result_set rs(muts.columnfamilies_mutation());
const query::result_set_row& view_row = rs.row(0);
auto base_name = view_row.get_nonnull<sstring>("base_table_name");
auto base_schema = do_load_schema_from_schema_tables(dbcfg, scylla_data_path, keyspace, base_name);
return db::schema_tables::create_view_from_mutations(ctxt, muts, ctxt.user_types(), std::move(base_schema));
} else {
// not setting cdc_schema because the schema is mostly used for reading by tools
// and not for generating cdc mutations.
return db::schema_tables::create_table_from_mutations(ctxt, muts, ctxt.user_types(), nullptr);
}
}
std::string_view to_string_view(bytes_view b) {
return std::string_view(reinterpret_cast<const char*>(b.data()), b.size());
}
data_type parse_type(bytes_view type_name) {
return db::marshal::type_parser::parse(to_string_view(type_name));
}
schema_ptr load_schema_from_statistics(sstring keyspace, sstring table, const sstables::shared_sstable& sstable) {
const auto& serialization_header = sstable->get_serialization_header();
const auto& compression = sstable->get_compression();
auto builder = schema_builder(keyspace, table);
// partition key
{
const auto pk_type_name = serialization_header.pk_type_name.value;
const bytes composite_prefix = "org.apache.cassandra.db.marshal.CompositeType(";
if (pk_type_name.starts_with(composite_prefix)) {
const auto composite_content = pk_type_name.substr(composite_prefix.size(), pk_type_name.size() - composite_prefix.size() - 1);
db::marshal::type_parser parser(to_string_view(composite_content));
unsigned i = 0;
while (!parser.is_eos()) {
builder.with_column(to_bytes(format("$pk{}", i++)), parser.parse(), column_kind::partition_key);
parser.skip_blank_and_comma();
}
} else {
builder.with_column("$pk", parse_type(pk_type_name), column_kind::partition_key);
}
}
// clustering key
{
unsigned i = 0;
for (const auto& type_name : serialization_header.clustering_key_types_names.elements) {
builder.with_column(to_bytes(format("$ck{}", i++)), parse_type(type_name.value), column_kind::clustering_key);
}
}
// static columns
for (const auto& col_desc : serialization_header.static_columns.elements) {
builder.with_column(col_desc.name.value, parse_type(col_desc.type_name.value), column_kind::static_column);
}
// regular columns
for (const auto& col_desc : serialization_header.regular_columns.elements) {
builder.with_column(col_desc.name.value, parse_type(col_desc.type_name.value), column_kind::regular_column);
}
// compression options
builder.set_compressor_params(sstables::options_from_compression(compression));
return builder.build();
}
sstring disk_string_to_string(const sstables::disk_string<uint32_t>& ds) {
return sstring(ds.value.begin(), ds.value.end());
}
column_kind from_sstable_column_kind(sstables::sstable_column_kind k) {
switch (k) {
case sstables::sstable_column_kind::partition_key: return column_kind::partition_key;
case sstables::sstable_column_kind::clustering_key: return column_kind::clustering_key;
case sstables::sstable_column_kind::static_column: return column_kind::static_column;
case sstables::sstable_column_kind::regular_column: return column_kind::regular_column;
}
on_internal_error(sllog, format("from_sstable_column_kind(): unknown column kind {}", static_cast<std::underlying_type_t<sstables::sstable_column_kind>>(k)));
}
schema_ptr load_schema_from_scylla_metadata(const sstables::shared_sstable& sstable) {
const auto& scylla_metadata = *sstable->get_scylla_metadata();
const auto& sstable_schema = *scylla_metadata.data.get<sstables::scylla_metadata_type::Schema, sstables::scylla_metadata::sstable_schema>();
auto builder = schema_builder(disk_string_to_string(sstable_schema.keyspace_name), disk_string_to_string(sstable_schema.table_name));
builder.set_uuid(sstable_schema.id);
builder.with_version(sstable_schema.version);
for (const auto& col_desc : sstable_schema.columns.elements) {
builder.with_column(col_desc.name.value, parse_type(col_desc.type.value), from_sstable_column_kind(col_desc.kind));
}
// compression options
builder.set_compressor_params(sstables::options_from_compression(sstable->get_compression()));
return builder.build();
}
schema_ptr do_load_schema_from_sstable(const db::config& dbcfg, std::filesystem::path sstable_path, sstring keyspace, sstring table) {
if (keyspace.empty()) {
keyspace = "my_keyspace";
}
if (table.empty()) {
table = "my_table";
}
db::nop_large_data_handler large_data_handler;
db::nop_corrupt_data_handler corrupt_data_handler(db::corrupt_data_handler::register_metrics::no);
gms::feature_service feature_service({get_disabled_features_from_db_config(dbcfg)});
cache_tracker tracker;
sstables::directory_semaphore dir_sem(1);
abort_source abort;
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sstables::sstables_manager::config sm_cfg {
.available_memory = memory::stats().total_memory(),
.data_file_directories = dbcfg.data_file_directories(),
};
sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, corrupt_data_handler, sm_cfg, feature_service, tracker,
dir_sem,
[host_id = locator::host_id::create_random_id()] { return host_id; }, *scf, abort, dbcfg.extensions().sstable_file_io_extensions());
auto close_sst_man = deferred_close(sst_man);
schema_ptr bootstrap_schema = schema_builder(keyspace, table).with_column("pk", int32_type, column_kind::partition_key).build();
const auto ed = sstables::parse_path(sstable_path, keyspace, table);
const auto dir_path = sstable_path.parent_path();
auto local = data_dictionary::make_local_options(dir_path);
auto bootstrap_sst = sst_man.make_sstable(bootstrap_schema, local, ed.generation, sstables::sstable_state::normal, ed.version, ed.format);
bootstrap_sst->load_metadata().get();
const auto scylla_metadata = bootstrap_sst->get_scylla_metadata();
if (scylla_metadata && scylla_metadata->data.get<sstables::scylla_metadata_type::Schema, sstables::scylla_metadata::sstable_schema>()) {
return load_schema_from_scylla_metadata(bootstrap_sst);
}
return load_schema_from_statistics(keyspace, table, bootstrap_sst);
}
} // anonymous namespace
namespace tools {
future<std::vector<schema_ptr>> load_schemas(const db::config& dbcfg, std::string_view schema_str) {
return async([&dbcfg, schema_str] () mutable {
return do_load_schemas(dbcfg, schema_str);
});
}
future<schema_ptr> load_one_schema_from_file(const db::config& dbcfg, std::filesystem::path path) {
return async([&dbcfg, path] () mutable {
auto schemas = do_load_schemas(dbcfg, read_file(path));
if (schemas.size() == 1) {
return std::move(schemas.front());
} else if (schemas.size() == 2) {
// We expect a base table at index 0 and a view/index on it at index 1
if (!schemas[0]->is_view() && schemas[1]->is_view() && schemas[0]->id() == schemas[1]->view_info()->base_id()) {
return std::move(schemas[1]);
}
}
throw std::runtime_error(fmt::format(
"Schema file {} expected to contain exactly 1 schema or 2 schemas (base table and view), actually has {} non-related schemas",
path.native(),
schemas.size()));
});
}
schema_ptr load_system_schema(const db::config& cfg, std::string_view keyspace, std::string_view table) {
std::unordered_map<std::string_view, std::vector<schema_ptr>> schemas{
{db::schema_tables::NAME, db::schema_tables::all_tables(db::schema_features::full())},
{db::system_keyspace::NAME, db::system_keyspace::all_tables(cfg)},
{db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::all_distributed_tables()},
{db::system_distributed_keyspace::NAME_EVERYWHERE, db::system_distributed_keyspace::all_everywhere_tables()},
};
auto ks_it = schemas.find(keyspace);
if (ks_it == schemas.end()) {
throw std::invalid_argument(fmt::format("unknown system keyspace: {}", keyspace));
}
auto tb_it = std::ranges::find_if(ks_it->second, [&] (const schema_ptr& s) {
return s->cf_name() == table;
});
if (tb_it == ks_it->second.end()) {
throw std::invalid_argument(fmt::format("unknown table {} in system keyspace: {}", table, keyspace));
}
return *tb_it;
}
future<schema_ptr> load_schema_from_schema_tables(const db::config& cfg, std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) {
return async([=, &cfg] () mutable {
return do_load_schema_from_schema_tables(cfg, scylla_data_path, keyspace, table);
});
}
future<schema_ptr> load_schema_from_sstable(const db::config& cfg, std::filesystem::path sstable_path, std::string_view keyspace, std::string_view table) {
return async([=, &cfg, sstable_path = std::move(sstable_path)] () mutable {
return do_load_schema_from_sstable(cfg, std::move(sstable_path), sstring(keyspace), sstring(table));
});
}
} // namespace tools