diff --git a/tools/schema_loader.cc b/tools/schema_loader.cc index 0233dd0792..596e4450b6 100644 --- a/tools/schema_loader.cc +++ b/tools/schema_loader.cc @@ -23,10 +23,15 @@ #include "db/cql_type_parser.hh" #include "db/config.hh" #include "db/extensions.hh" +#include "db/large_data_handler.hh" #include "db/system_distributed_keyspace.hh" #include "db/schema_tables.hh" #include "db/system_keyspace.hh" +#include "partition_slice_builder.hh" +#include "readers/combined.hh" #include "replica/database.hh" +#include "sstables/sstables_manager.hh" +#include "types/list.hh" #include "data_dictionary/impl.hh" #include "data_dictionary/data_dictionary.hh" #include "gms/feature_service.hh" @@ -319,6 +324,218 @@ std::vector do_load_schemas(std::string_view schema_str) { return schemas; } +struct sstable_manager_service { + db::nop_large_data_handler large_data_handler; + db::config dbcfg; + gms::feature_service feature_service; + cache_tracker tracker; + sstables::directory_semaphore dir_sem; + sstables::sstables_manager sst_man; + + explicit sstable_manager_service() + : feature_service(gms::feature_config_from_db_config(dbcfg)) + , dir_sem(1) + , sst_man(large_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem) { + } + + future<> stop() { + return sst_man.close(); + } +}; + +mutation_opt read_schema_table_mutation(sharded& sst_man, std::filesystem::path schema_table_data_path, + std::function schema_factory, reader_permit permit, std::string_view keyspace, std::vector ck_strings) { + + sharded sst_dirs; + sst_dirs.start( + sharded_parameter([&sst_man] { return std::ref(sst_man.local().sst_man); }), + sharded_parameter([&schema_factory] { return schema_factory(); }), + sharded_parameter([] { return make_lw_shared(); }), + schema_table_data_path, + sharded_parameter([] { return default_priority_class(); }), + sharded_parameter([] { return default_io_error_handler_gen(); })).get(); + auto stop_sst_dirs = deferred_stop(sst_dirs); + + auto sstable_open_infos = sst_dirs.map_reduce0( + [] (sstables::sstable_directory& sst_dir) -> future> { + co_await sst_dir.process_sstable_dir(sstables::sstable_directory::process_flags{ .sort_sstables_according_to_owner = false }); + const auto& unsorted_ssts = sst_dir.get_unsorted_sstables(); + std::vector open_infos; + open_infos.reserve(unsorted_ssts.size()); + for (auto& sst : unsorted_ssts) { + open_infos.push_back(co_await sst->get_open_info()); + } + co_return open_infos; + }, + std::vector{}, + [] (std::vector a, std::vector b) { + std::move(b.begin(), b.end(), std::back_inserter(a)); + return a; + }).get(); + + auto schema_table_schema = schema_factory(); + + if (sstable_open_infos.empty()) { + return {}; + } + + std::vector sstables; + sstables.reserve(sstable_open_infos.size()); + for (auto& open_info : sstable_open_infos) { + sstables.push_back(sst_dirs.local().load_foreign_sstable(open_info).get()); + } + + auto pk = partition_key::from_deeply_exploded(*schema_table_schema, {data_value(keyspace)}); + auto dk = dht::decorate_key(*schema_table_schema, pk); + auto pr = dht::partition_range::make_singular(dk); + + std::vector raw_ck_values; + raw_ck_values.reserve(ck_strings.size()); + for (const auto& ck_str : ck_strings) { + raw_ck_values.push_back(data_value(ck_str)); + } + auto ck = clustering_key::from_deeply_exploded(*schema_table_schema, raw_ck_values); + auto cr = query::clustering_range::make({ck, true}, {ck, true}); + auto ps = partition_slice_builder(*schema_table_schema) + .with_range(cr) + .build(); + + std::vector readers; + readers.reserve(sstables.size()); + for (const auto& sst : sstables) { + readers.emplace_back(sst->make_reader(schema_table_schema, permit, pr, ps)); + } + auto reader = make_combined_reader(schema_table_schema, permit, std::move(readers)); + + return read_mutation_from_flat_mutation_reader(reader).get(); +} + +class single_keyspace_user_types_storage : public data_dictionary::user_types_storage { + data_dictionary::user_types_metadata _utm; +public: + single_keyspace_user_types_storage(data_dictionary::user_types_metadata utm) : _utm(std::move(utm)) { } + virtual const data_dictionary::user_types_metadata& get(const sstring& ks) const override { + return _utm; + } +}; + +std::unordered_map get_schema_table_directories(std::filesystem::path scylla_data_path) { + const std::vector schemas{ + db::schema_tables::types(), + db::schema_tables::tables(), + db::schema_tables::columns(), + db::schema_tables::view_virtual_columns(), + db::schema_tables::computed_columns(), + db::schema_tables::indexes(), + db::schema_tables::dropped_columns(), + db::schema_tables::scylla_tables()}; + + std::unordered_map schema_table_table_dir; + + auto schema_tables_path = scylla_data_path / db::schema_tables::NAME; + auto schema_tables_dir = open_directory(schema_tables_path.native()).get(); + + schema_tables_dir.list_directory([&] (directory_entry de) -> future<> { + auto dash_pos = de.name.find_last_of('-'); + auto table_name = de.name.substr(0, dash_pos); + + auto it = boost::find_if(schemas, [&] (const schema_ptr& s) { + return s->cf_name() == table_name; + }); + + if (it != schemas.end()) { + if (!de.type) { + throw std::runtime_error(fmt::format("failed loading schema tables from {}: keyspace directory entry {} has unrecognized type", scylla_data_path.native(), de.name)); + } else if (*de.type != directory_entry_type::directory) { + throw std::runtime_error(fmt::format("failed loading schema tables from {}: keyspace directory entry {} has unrecognized type {}", scylla_data_path.native(), de.name, static_cast(*de.type))); + } + auto s = *it; + schema_table_table_dir[s] = de.name; + } + return make_ready_future<>(); + }).done().get(); + + if (schema_table_table_dir.size() != schemas.size()) { + throw std::runtime_error(fmt::format("failed loading schema tables from {}: couldn't find table directory for all require schema tables", scylla_data_path.native())); + } + + return schema_table_table_dir; +} + +schema_ptr do_load_schema_from_schema_tables(std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) { + reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__); + auto stop_semaphore = deferred_stop(rcs_sem); + + sharded sst_man; + sst_man.start().get(); + auto stop_sst_man_service = deferred_stop(sst_man); + + auto schema_table_table_dir = get_schema_table_directories(scylla_data_path); + auto schema_tables_path = scylla_data_path / db::schema_tables::NAME; + + auto do_load = [&] (std::function schema_factory) { + auto s = schema_factory(); + return read_schema_table_mutation( + sst_man, + schema_tables_path / schema_table_table_dir[s], + schema_factory, + rcs_sem.make_tracking_only_permit(s.get(), "schema_mutation", db::no_timeout, {}), + keyspace, + {table}); + }; + mutation_opt tables = do_load(db::schema_tables::tables); + mutation_opt columns = do_load(db::schema_tables::columns); + mutation_opt view_virtual_columns = do_load(db::schema_tables::view_virtual_columns); + mutation_opt computed_columns = do_load(db::schema_tables::computed_columns); + mutation_opt indexes = do_load(db::schema_tables::indexes); + mutation_opt dropped_columns = do_load(db::schema_tables::dropped_columns); + mutation_opt scylla_tables = do_load([] () { return db::schema_tables::scylla_tables(); }); + + if (!tables || !columns) { + throw std::runtime_error(fmt::format("Failed to find {}.{} in 'tables' and/or 'columns' schema tables", keyspace, table)); + } + + data_dictionary::user_types_metadata utm; + + auto types_mut = read_schema_table_mutation( + sst_man, + schema_tables_path / schema_table_table_dir[db::schema_tables::types()], + db::schema_tables::types, + rcs_sem.make_tracking_only_permit(db::schema_tables::types().get(), "types_mutation", db::no_timeout, {}), + keyspace, + {}); + if (types_mut) { + query::result_set result(*types_mut); + + auto ks = make_lw_shared(keyspace, "org.apache.cassandra.locator.LocalStrategy", std::map{}, false); + db::cql_type_parser::raw_builder ut_builder(*ks); + + auto get_list = [] (const query::result_set_row& row, const char* name) { + return boost::copy_range>( + row.get_nonnull(name) + | boost::adaptors::transformed([] (const data_value& v) { return value_cast(v); })); + }; + + for (const auto& row : result.rows()) { + const auto name = row.get_nonnull("type_name"); + const auto field_names = get_list(row, "field_names"); + const auto field_types = get_list(row, "field_types"); + ut_builder.add(name, field_names, field_types); + } + + for (auto&& ut : ut_builder.build()) { + utm.add_type(std::move(ut)); + } + } + + db::config dbcfg; + auto user_type_storage = std::make_shared(std::move(utm)); + db::schema_ctxt ctxt(dbcfg, user_type_storage); + schema_mutations muts(std::move(*tables), std::move(*columns), std::move(view_virtual_columns), std::move(computed_columns), std::move(indexes), + std::move(dropped_columns), std::move(scylla_tables)); + return db::schema_tables::create_table_from_mutations(ctxt, muts); +} + } // anonymous namespace namespace tools { @@ -362,4 +579,10 @@ schema_ptr load_system_schema(std::string_view keyspace, std::string_view table) return *tb_it; } +future load_schema_from_schema_tables(std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) { + return async([=] () mutable { + return do_load_schema_from_schema_tables(scylla_data_path, keyspace, table); + }); +} + } // namespace tools diff --git a/tools/schema_loader.hh b/tools/schema_loader.hh index 79b5e0b0e1..24c1368fb5 100644 --- a/tools/schema_loader.hh +++ b/tools/schema_loader.hh @@ -49,4 +49,13 @@ future load_one_schema_from_file(std::filesystem::path path); /// all schema and experimental features enabled. schema_ptr load_system_schema(std::string_view keyspace, std::string_view table); +/// Load the schema of the table with the designated keyspace and table name, +/// from the system schema table sstables. +/// +/// The schema table sstables are accessed for read only. In general this method +/// tries very hard to have no side-effects. +/// The \p scylla_data_path parameter is expected to point to the scylla data +/// directory, which is usually /var/lib/scylla/data. +future load_schema_from_schema_tables(std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table); + } // namespace tools