/* * Copyright (C) 2021-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "utils/assert.hh" #include #include #include #include #include #include #include "cdc/cdc_partitioner.hh" #include "cdc/log.hh" #include "cdc/cdc_options.hh" #include "cql3/query_processor.hh" #include "cql3/statements/create_keyspace_statement.hh" #include "cql3/statements/create_table_statement.hh" #include "cql3/statements/create_type_statement.hh" #include "cql3/statements/create_view_statement.hh" #include "cql3/statements/create_index_statement.hh" #include "cql3/statements/update_statement.hh" #include "db/cql_type_parser.hh" #include "db/config.hh" #include "db/extensions.hh" #include "db/large_data_handler.hh" #include "db/corrupt_data_handler.hh" #include "db/system_distributed_keyspace.hh" #include "db/schema_tables.hh" #include "db/system_keyspace.hh" #include "partition_slice_builder.hh" #include "readers/combined.hh" #include "replica/database.hh" #include "sstables/sstables_manager.hh" #include "sstables/sstable_directory.hh" #include "types/list.hh" #include "data_dictionary/impl.hh" #include "data_dictionary/data_dictionary.hh" #include "gms/feature_service.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/local_strategy.hh" #include "tools/schema_loader.hh" #include "tools/read_mutation.hh" #include "view_info.hh" namespace { logging::logger sllog("schema_loader"); class data_dictionary_impl; struct keyspace; struct table; struct database { const db::config& cfg; gms::feature_service& features; std::list keyspaces; std::list tables; database(const db::config& cfg, gms::feature_service& features) : cfg(cfg), features(features) { } database(database&&) = delete; }; struct keyspace { lw_shared_ptr metadata; explicit keyspace(lw_shared_ptr metadata) : metadata(std::move(metadata)) { } keyspace(keyspace&&) = delete; }; struct table { const keyspace& ks; schema_ptr schema; secondary_index::secondary_index_manager secondary_idx_man; bool user; table(data_dictionary_impl& impl, const keyspace& ks, schema_ptr schema, bool user); table(table&&) = delete; }; class data_dictionary_impl : public data_dictionary::impl { public: const data_dictionary::database wrap(const database& db) const { return make_database(this, &db); } data_dictionary::keyspace wrap(const keyspace& ks) const { return make_keyspace(this, &ks); } data_dictionary::table wrap(const table& t) const { return make_table(this, &t); } static const database& unwrap(data_dictionary::database db) { return *static_cast(extract(db)); } static const keyspace& unwrap(data_dictionary::keyspace ks) { return *static_cast(extract(ks)); } static const table& unwrap(data_dictionary::table t) { return *static_cast(extract(t)); } private: virtual const table_schema_version& get_version(data_dictionary::database) const override { throw std::bad_function_call(); } virtual std::optional try_find_keyspace(data_dictionary::database db, std::string_view name) const override { auto& keyspaces = unwrap(db).keyspaces; auto it = std::find_if(keyspaces.begin(), keyspaces.end(), [name] (const keyspace& ks) { return ks.metadata->name() == name; }); if (it == keyspaces.end()) { return {}; } return wrap(*it); } virtual std::vector get_keyspaces(data_dictionary::database db) const override { return unwrap(db).keyspaces | std::views::transform([this] (const keyspace& ks) { return wrap(ks); }) | std::ranges::to>(); } virtual std::vector get_user_keyspaces(data_dictionary::database db) const override { return std::ranges::to>( unwrap(db).keyspaces | std::views::transform([] (const keyspace& ks) { return ks.metadata->name(); }) | std::views::filter([] (const sstring& ks) {return !is_internal_keyspace(ks); }) ); } virtual std::vector get_all_keyspaces(data_dictionary::database db) const override { return unwrap(db).keyspaces | std::views::transform([] (const keyspace& ks) { return ks.metadata->name(); }) | std::ranges::to>(); } virtual std::vector get_tables(data_dictionary::database db) const override { return unwrap(db).tables | std::views::transform([this] (const table& ks) { return wrap(ks); }) | std::ranges::to>(); } virtual std::optional try_find_table(data_dictionary::database db, std::string_view ks, std::string_view tab) const override { auto& tables = unwrap(db).tables; auto it = std::find_if(tables.begin(), tables.end(), [ks, tab] (const table& tbl) { return tbl.schema->ks_name() == ks && tbl.schema->cf_name() == tab; }); if (it == tables.end()) { return {}; } return wrap(*it); } virtual std::optional try_find_table(data_dictionary::database db, table_id id) const override { auto& tables = unwrap(db).tables; auto it = std::find_if(tables.begin(), tables.end(), [id] (const table& tbl) { return tbl.schema->id() == id; }); if (it == tables.end()) { return {}; } return wrap(*it); } virtual const secondary_index::secondary_index_manager& get_index_manager(data_dictionary::table t) const override { return unwrap(t).secondary_idx_man; } virtual schema_ptr get_table_schema(data_dictionary::table t) const override { return unwrap(t).schema; } virtual db_clock::time_point get_truncation_time(data_dictionary::table t) const override { return {}; } virtual lw_shared_ptr get_keyspace_metadata(data_dictionary::keyspace ks) const override { return unwrap(ks).metadata; } virtual bool is_internal(data_dictionary::keyspace ks) const override { return is_system_keyspace(unwrap(ks).metadata->name()); } virtual const locator::abstract_replication_strategy& get_replication_strategy(data_dictionary::keyspace ks) const override { static const locator::local_strategy strategy{locator::replication_strategy_params{locator::replication_strategy_config_options{}, {}, data_dictionary::consistency_config_option::eventual}, nullptr}; return strategy; } virtual const std::vector& get_table_views(data_dictionary::table t) const override { static const std::vector empty; return empty; } virtual sstring get_available_index_name(data_dictionary::database db, std::string_view ks_name, std::string_view cf_name, std::optional index_name_root) const override { auto has_schema = [&] (std::string_view ks_name, std::string_view table_name) { const auto& tables = unwrap(db).tables; return std::find_if(tables.begin(), tables.end(), [&] (const table& t) { return t.schema->ks_name() == ks_name && t.schema->cf_name() == table_name; }) != tables.end(); }; return secondary_index::get_available_index_name(ks_name, cf_name, index_name_root, existing_index_names(db, ks_name), has_schema); } virtual std::set existing_index_names(data_dictionary::database db, std::string_view ks_name, std::string_view cf_to_exclude = {}) const override { auto tables = std::ranges::to>(unwrap(db).tables | std::views::filter([ks_name] (const table& t) { return t.schema->ks_name() == ks_name; }) | std::views::transform([] (const table& t) { return t.schema; })); return secondary_index::existing_index_names(tables, cf_to_exclude); } virtual schema_ptr find_indexed_table(data_dictionary::database db, std::string_view ks_name, std::string_view index_name) const override { return {}; } virtual schema_ptr get_cdc_base_table(data_dictionary::database db, const schema&) const override { return {}; } virtual const db::config& get_config(data_dictionary::database db) const override { return unwrap(db).cfg; } virtual const db::extensions& get_extensions(data_dictionary::database db) const override { return get_config(db).extensions(); } virtual const gms::feature_service& get_features(data_dictionary::database db) const override { return unwrap(db).features; } virtual replica::database& real_database(data_dictionary::database db) const override { throw std::bad_function_call(); } virtual replica::database* real_database_ptr(data_dictionary::database db) const override { return nullptr; } }; class user_types_storage : public data_dictionary::user_types_storage { database& _db; public: user_types_storage(database& db) noexcept : _db(db) {} virtual const data_dictionary::user_types_metadata& get(const sstring& name) const override { for (const auto& ks : _db.keyspaces) { if (ks.metadata->name() == name) { return ks.metadata->user_types(); } } throw data_dictionary::no_such_keyspace(name); } }; table::table(data_dictionary_impl& impl, const keyspace& ks, schema_ptr schema, bool user) : ks(ks), schema(std::move(schema)), secondary_idx_man(impl.wrap(*this)), user(user) { } sstring read_file(std::filesystem::path path) { auto file = open_file_dma(path.native(), open_flags::ro).get(); auto fstream = make_file_input_stream(file); return util::read_entire_stream_contiguous(fstream).get(); } std::vector do_load_schemas(const db::config& cfg, std::string_view schema_str) { cql3::cql_stats cql_stats; gms::feature_service feature_service({get_disabled_features_from_db_config(cfg)}); feature_service.enable(feature_service.supported_feature_set()).get(); feature_service.views_with_tablets.enable(); sharded token_metadata; auto my_address = gms::inet_address("localhost"); locator::token_metadata::config tm_cfg; tm_cfg.topo_cfg.this_endpoint = my_address; tm_cfg.topo_cfg.this_cql_address = my_address; tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location; token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get(); auto stop_token_metadata = deferred_stop(token_metadata); data_dictionary_impl dd_impl; database real_db(cfg, feature_service); auto db = dd_impl.wrap(real_db); // Mock system_schema keyspace to be able to parse modification statements // against system_schema.dropped_columns. real_db.keyspaces.emplace_back(make_lw_shared( db::schema_tables::NAME, "org.apache.cassandra.locator.LocalStrategy", locator::replication_strategy_config_options{}, std::nullopt, std::nullopt, false)); real_db.tables.emplace_back(dd_impl, real_db.keyspaces.back(), db::schema_tables::dropped_columns(), false); auto find_or_create_keyspace = [&] (const sstring& name) -> data_dictionary::keyspace { try { return db.find_keyspace(name); } catch (replica::no_such_keyspace&) { // fall-though to below } auto raw_statement = cql3::query_processor::parse_statement( fmt::format("CREATE KEYSPACE {} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}", name), cql3::dialect{}); auto prepared_statement = raw_statement->prepare(db, cql_stats); auto* statement = prepared_statement->statement.get(); auto p = dynamic_cast(statement); SCYLLA_ASSERT(p); real_db.keyspaces.emplace_back(p->get_keyspace_metadata(*token_metadata.local().get(), feature_service, cfg)); return db.find_keyspace(name); }; std::vector> raw_statements; try { raw_statements = cql3::query_processor::parse_statements(schema_str, cql3::dialect{}); } catch (...) { throw std::runtime_error(format("tools:do_load_schemas(): failed to parse CQL statements: {}", std::current_exception())); } for (auto& raw_statement : raw_statements) { auto cf_statement = dynamic_cast(raw_statement.get()); if (!cf_statement) { continue; // we don't support any non-cf statements here } if (!cf_statement->has_keyspace()) { throw std::runtime_error("tools::do_load_schemas(): CQL statement does not have keyspace specified"); } auto ks = find_or_create_keyspace(cf_statement->keyspace()); auto prepared_statement = cf_statement->prepare(db, cql_stats); auto* statement = prepared_statement->statement.get(); if (auto p = dynamic_cast(statement)) { real_db.keyspaces.emplace_back(p->get_keyspace_metadata(*token_metadata.local().get(), feature_service, cfg)); } else if (auto p = dynamic_cast(statement)) { dd_impl.unwrap(ks).metadata->add_user_type(p->create_type(db)); } else if (auto p = dynamic_cast(statement)) { auto schema = p->get_cf_meta_data(db); // CDC tables use a custom partitioner, which is not reflected when // dumping the schema to schema.cql, so we have to manually set it here. if (cdc::is_log_name(schema->cf_name())) { schema_builder b(std::move(schema)); b.with_partitioner(cdc::cdc_partitioner::classname); schema = b.build(); } real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), std::move(schema), true); } else if (auto p = dynamic_cast(statement)) { auto&& [view, warnings] = p->prepare_view(db); auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) { return t.schema->ks_name() == view->ks_name() && t.schema->cf_name() == view->cf_name(); }); if (it != real_db.tables.end()) { continue; // view already exists } real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), view, true); } else if (auto p = dynamic_cast(statement)) { auto res = p->build_index_schema(db); if (!res) { continue; // index already exists } auto [new_base_schema, index] = *res; auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) { return t.schema->id() == new_base_schema->id(); }); if (it == real_db.tables.end()) { // shouldn't happen but let's handle it throw std::runtime_error(fmt::format("tools::do_load_schemas(): failed to look up base table {}.{}, while creating index on it", new_base_schema->ks_name(), new_base_schema->cf_name())); } it->schema = std::move(new_base_schema); it->secondary_idx_man.reload(); auto view = p->create_view_for_index(it->schema, index, db); real_db.tables.emplace_back(dd_impl, dd_impl.unwrap(ks), view, true); } else if (auto p = dynamic_cast(statement)) { if (p->keyspace() != db::schema_tables::NAME && p->column_family() != db::schema_tables::DROPPED_COLUMNS) { throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected modification statement to be against {}.{}, but it is against {}.{}", db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, p->keyspace(), p->column_family())); } auto schema = db::schema_tables::dropped_columns(); cql3::statements::modification_statement::json_cache_opt json_cache{}; cql3::update_parameters params(schema, cql3::query_options::DEFAULT, api::new_timestamp(), schema->default_time_to_live(), cql3::update_parameters::prefetch_data(schema)); auto pkeys = p->build_partition_keys(cql3::query_options::DEFAULT, json_cache); auto ckranges = p->create_clustering_ranges(cql3::query_options::DEFAULT, json_cache); auto updates = p->apply_updates(pkeys, ckranges, params, json_cache); if (updates.size() != 1) { throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected one update per statement for {}.{}, got: {}", db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, updates.size())); } auto& mut = updates.front(); if (!mut.partition().row_tombstones().empty()) { throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected only update against {}.{}, not deletes", db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS)); } query::result_set rs(mut); for (auto& row : rs.rows()) { const auto keyspace_name = row.get_nonnull("keyspace_name"); const auto table_name = row.get_nonnull("table_name"); auto it = std::find_if(real_db.tables.begin(), real_db.tables.end(), [&] (const table& t) { auto& s = t.schema; return s->ks_name() == keyspace_name && s->cf_name() == table_name; }); if (it == real_db.tables.end()) { throw std::runtime_error(fmt::format("tools::do_load_schemas(): failed applying update to {}.{}, the table it applies to is not found: {}.{}", db::schema_tables::NAME, db::schema_tables::DROPPED_COLUMNS, keyspace_name, table_name)); } auto name = row.get_nonnull("column_name"); auto type = db::cql_type_parser::parse(keyspace_name, row.get_nonnull("type"), user_types_storage(real_db)); auto time = row.get_nonnull("dropped_time"); it->schema = schema_builder(std::move(it->schema)).without_column(std::move(name), std::move(type), time.time_since_epoch().count()).build(); } } else { throw std::runtime_error(fmt::format("tools::do_load_schemas(): expected statement to be one of (create keyspace, create type, create table), got: {}", typeid(statement).name())); } } return std::ranges::to>( real_db.tables | std::views::filter([] (const table& t) { return t.user; }) | std::views::transform([] (const table& t) { return t.schema; })); } class single_keyspace_user_types_storage : public data_dictionary::user_types_storage { data_dictionary::user_types_metadata _utm; public: single_keyspace_user_types_storage(data_dictionary::user_types_metadata utm) : _utm(std::move(utm)) { } virtual const data_dictionary::user_types_metadata& get(const sstring& ks) const override { return _utm; } }; schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) { reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__, reader_concurrency_semaphore::register_metrics::no); auto stop_semaphore = deferred_stop(rcs_sem); auto scf = make_sstable_compressor_factory_for_tests_in_thread(); sharded sst_man; sst_man.start(std::ref(dbcfg), std::ref(*scf)).get(); auto stop_sst_man_service = deferred_stop(sst_man); auto schema_tables_path = scylla_data_path / db::schema_tables::NAME; auto empty = [] (const mutation_opt& mopt) { return !mopt || !mopt->partition().row_count(); }; auto do_load = [&] (std::function schema_factory) { auto s = schema_factory(); return read_mutation_from_table_offline( sst_man, rcs_sem.make_tracking_only_permit(s, "schema_mutation", db::no_timeout, {}), get_table_directory(scylla_data_path, s->ks_name(), s->cf_name()).get(), keyspace, schema_factory, data_value(keyspace), data_value(table)); }; mutation_opt tables = do_load(db::schema_tables::tables); mutation_opt views = do_load(db::schema_tables::views); mutation_opt columns = do_load(db::schema_tables::columns); mutation_opt view_virtual_columns = do_load(db::schema_tables::view_virtual_columns); mutation_opt computed_columns = do_load(db::schema_tables::computed_columns); mutation_opt indexes = do_load(db::schema_tables::indexes); mutation_opt dropped_columns = do_load(db::schema_tables::dropped_columns); mutation_opt scylla_tables = do_load([] () { return db::schema_tables::scylla_tables(); }); if ((empty(tables) && empty(views)) || empty(columns)) { throw std::runtime_error(fmt::format("Failed to find {}.{} in schema tables", keyspace, table)); } data_dictionary::user_types_metadata utm; auto types_schema = db::schema_tables::types(); auto types_mut = read_mutation_from_table_offline( sst_man, rcs_sem.make_tracking_only_permit(db::schema_tables::types(), "types_mutation", db::no_timeout, {}), get_table_directory(scylla_data_path, types_schema->ks_name(), types_schema->cf_name()).get(), keyspace, db::schema_tables::types, data_value(keyspace), {}); if (types_mut) { query::result_set result(*types_mut); auto ks = make_lw_shared(keyspace, "org.apache.cassandra.locator.LocalStrategy", locator::replication_strategy_config_options{}, std::nullopt, std::nullopt, false); db::cql_type_parser::raw_builder ut_builder(*ks); auto get_list = [] (const query::result_set_row& row, const char* name) { return row.get_nonnull(name) | std::views::transform([] (const data_value& v) { return value_cast(v); }) | std::ranges::to>(); }; for (const auto& row : result.rows()) { const auto name = row.get_nonnull("type_name"); const auto field_names = get_list(row, "field_names"); const auto field_types = get_list(row, "field_types"); ut_builder.add(name, field_names, field_types); } auto user_types = ut_builder.build().get(); for (auto&& ut : user_types) { utm.add_type(std::move(ut)); } } auto user_type_storage = std::make_shared(std::move(utm)); gms::feature_service features({get_disabled_features_from_db_config(dbcfg)}); db::schema_ctxt ctxt(dbcfg, user_type_storage, features); if (empty(tables)) { tables = std::move(views); } schema_mutations muts(std::move(*tables), std::move(*columns), std::move(view_virtual_columns), std::move(computed_columns), std::move(indexes), std::move(dropped_columns), std::move(scylla_tables)); if (muts.is_view()) { query::result_set rs(muts.columnfamilies_mutation()); const query::result_set_row& view_row = rs.row(0); auto base_name = view_row.get_nonnull("base_table_name"); auto base_schema = do_load_schema_from_schema_tables(dbcfg, scylla_data_path, keyspace, base_name); return db::schema_tables::create_view_from_mutations(ctxt, muts, ctxt.user_types(), std::move(base_schema)); } else { // not setting cdc_schema because the schema is mostly used for reading by tools // and not for generating cdc mutations. return db::schema_tables::create_table_from_mutations(ctxt, muts, ctxt.user_types(), nullptr); } } std::string_view to_string_view(bytes_view b) { return std::string_view(reinterpret_cast(b.data()), b.size()); } data_type parse_type(bytes_view type_name) { return db::marshal::type_parser::parse(to_string_view(type_name)); } schema_ptr load_schema_from_statistics(sstring keyspace, sstring table, const sstables::shared_sstable& sstable) { const auto& serialization_header = sstable->get_serialization_header(); const auto& compression = sstable->get_compression(); auto builder = schema_builder(keyspace, table); // partition key { const auto pk_type_name = serialization_header.pk_type_name.value; const bytes composite_prefix = "org.apache.cassandra.db.marshal.CompositeType("; if (pk_type_name.starts_with(composite_prefix)) { const auto composite_content = pk_type_name.substr(composite_prefix.size(), pk_type_name.size() - composite_prefix.size() - 1); db::marshal::type_parser parser(to_string_view(composite_content)); unsigned i = 0; while (!parser.is_eos()) { builder.with_column(to_bytes(format("$pk{}", i++)), parser.parse(), column_kind::partition_key); parser.skip_blank_and_comma(); } } else { builder.with_column("$pk", parse_type(pk_type_name), column_kind::partition_key); } } // clustering key { unsigned i = 0; for (const auto& type_name : serialization_header.clustering_key_types_names.elements) { builder.with_column(to_bytes(format("$ck{}", i++)), parse_type(type_name.value), column_kind::clustering_key); } } // static columns for (const auto& col_desc : serialization_header.static_columns.elements) { builder.with_column(col_desc.name.value, parse_type(col_desc.type_name.value), column_kind::static_column); } // regular columns for (const auto& col_desc : serialization_header.regular_columns.elements) { builder.with_column(col_desc.name.value, parse_type(col_desc.type_name.value), column_kind::regular_column); } // compression options builder.set_compressor_params(sstables::options_from_compression(compression)); return builder.build(); } sstring disk_string_to_string(const sstables::disk_string& ds) { return sstring(ds.value.begin(), ds.value.end()); } column_kind from_sstable_column_kind(sstables::sstable_column_kind k) { switch (k) { case sstables::sstable_column_kind::partition_key: return column_kind::partition_key; case sstables::sstable_column_kind::clustering_key: return column_kind::clustering_key; case sstables::sstable_column_kind::static_column: return column_kind::static_column; case sstables::sstable_column_kind::regular_column: return column_kind::regular_column; } on_internal_error(sllog, format("from_sstable_column_kind(): unknown column kind {}", static_cast>(k))); } schema_ptr load_schema_from_scylla_metadata(const sstables::shared_sstable& sstable) { const auto& scylla_metadata = *sstable->get_scylla_metadata(); const auto& sstable_schema = *scylla_metadata.data.get(); auto builder = schema_builder(disk_string_to_string(sstable_schema.keyspace_name), disk_string_to_string(sstable_schema.table_name)); builder.set_uuid(sstable_schema.id); builder.with_version(sstable_schema.version); for (const auto& col_desc : sstable_schema.columns.elements) { builder.with_column(col_desc.name.value, parse_type(col_desc.type.value), from_sstable_column_kind(col_desc.kind)); } // compression options builder.set_compressor_params(sstables::options_from_compression(sstable->get_compression())); return builder.build(); } schema_ptr do_load_schema_from_sstable(const db::config& dbcfg, std::filesystem::path sstable_path, sstring keyspace, sstring table) { if (keyspace.empty()) { keyspace = "my_keyspace"; } if (table.empty()) { table = "my_table"; } db::nop_large_data_handler large_data_handler; db::nop_corrupt_data_handler corrupt_data_handler(db::corrupt_data_handler::register_metrics::no); gms::feature_service feature_service({get_disabled_features_from_db_config(dbcfg)}); cache_tracker tracker; sstables::directory_semaphore dir_sem(1); abort_source abort; auto scf = make_sstable_compressor_factory_for_tests_in_thread(); sstables::sstables_manager::config sm_cfg { .available_memory = memory::stats().total_memory(), .data_file_directories = dbcfg.data_file_directories(), }; sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, corrupt_data_handler, sm_cfg, feature_service, tracker, dir_sem, [host_id = locator::host_id::create_random_id()] { return host_id; }, *scf, abort, dbcfg.extensions().sstable_file_io_extensions()); auto close_sst_man = deferred_close(sst_man); schema_ptr bootstrap_schema = schema_builder(keyspace, table).with_column("pk", int32_type, column_kind::partition_key).build(); const auto ed = sstables::parse_path(sstable_path, keyspace, table); const auto dir_path = sstable_path.parent_path(); auto local = data_dictionary::make_local_options(dir_path); auto bootstrap_sst = sst_man.make_sstable(bootstrap_schema, local, ed.generation, sstables::sstable_state::normal, ed.version, ed.format); bootstrap_sst->load_metadata().get(); const auto scylla_metadata = bootstrap_sst->get_scylla_metadata(); if (scylla_metadata && scylla_metadata->data.get()) { return load_schema_from_scylla_metadata(bootstrap_sst); } return load_schema_from_statistics(keyspace, table, bootstrap_sst); } } // anonymous namespace namespace tools { future> load_schemas(const db::config& dbcfg, std::string_view schema_str) { return async([&dbcfg, schema_str] () mutable { return do_load_schemas(dbcfg, schema_str); }); } future load_one_schema_from_file(const db::config& dbcfg, std::filesystem::path path) { return async([&dbcfg, path] () mutable { auto schemas = do_load_schemas(dbcfg, read_file(path)); if (schemas.size() == 1) { return std::move(schemas.front()); } else if (schemas.size() == 2) { // We expect a base table at index 0 and a view/index on it at index 1 if (!schemas[0]->is_view() && schemas[1]->is_view() && schemas[0]->id() == schemas[1]->view_info()->base_id()) { return std::move(schemas[1]); } } throw std::runtime_error(fmt::format( "Schema file {} expected to contain exactly 1 schema or 2 schemas (base table and view), actually has {} non-related schemas", path.native(), schemas.size())); }); } schema_ptr load_system_schema(const db::config& cfg, std::string_view keyspace, std::string_view table) { std::unordered_map> schemas{ {db::schema_tables::NAME, db::schema_tables::all_tables(db::schema_features::full())}, {db::system_keyspace::NAME, db::system_keyspace::all_tables(cfg)}, {db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::all_distributed_tables()}, {db::system_distributed_keyspace::NAME_EVERYWHERE, db::system_distributed_keyspace::all_everywhere_tables()}, }; auto ks_it = schemas.find(keyspace); if (ks_it == schemas.end()) { throw std::invalid_argument(fmt::format("unknown system keyspace: {}", keyspace)); } auto tb_it = std::ranges::find_if(ks_it->second, [&] (const schema_ptr& s) { return s->cf_name() == table; }); if (tb_it == ks_it->second.end()) { throw std::invalid_argument(fmt::format("unknown table {} in system keyspace: {}", table, keyspace)); } return *tb_it; } future load_schema_from_schema_tables(const db::config& cfg, std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) { return async([=, &cfg] () mutable { return do_load_schema_from_schema_tables(cfg, scylla_data_path, keyspace, table); }); } future load_schema_from_sstable(const db::config& cfg, std::filesystem::path sstable_path, std::string_view keyspace, std::string_view table) { return async([=, &cfg, sstable_path = std::move(sstable_path)] () mutable { return do_load_schema_from_sstable(cfg, std::move(sstable_path), sstring(keyspace), sstring(table)); }); } } // namespace tools