mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
tools/schema_loader: add load_schema_from_schema_tables()
Allows loading the schema for the designated keyspace and table, from the system table sstables located on disk. The sstable files opened for read only.
This commit is contained in:
@@ -23,10 +23,15 @@
|
||||
#include "db/cql_type_parser.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/extensions.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "readers/combined.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "types/list.hh"
|
||||
#include "data_dictionary/impl.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
@@ -319,6 +324,218 @@ std::vector<schema_ptr> do_load_schemas(std::string_view schema_str) {
|
||||
return schemas;
|
||||
}
|
||||
|
||||
struct sstable_manager_service {
|
||||
db::nop_large_data_handler large_data_handler;
|
||||
db::config dbcfg;
|
||||
gms::feature_service feature_service;
|
||||
cache_tracker tracker;
|
||||
sstables::directory_semaphore dir_sem;
|
||||
sstables::sstables_manager sst_man;
|
||||
|
||||
explicit sstable_manager_service()
|
||||
: feature_service(gms::feature_config_from_db_config(dbcfg))
|
||||
, dir_sem(1)
|
||||
, sst_man(large_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem) {
|
||||
}
|
||||
|
||||
future<> stop() {
|
||||
return sst_man.close();
|
||||
}
|
||||
};
|
||||
|
||||
mutation_opt read_schema_table_mutation(sharded<sstable_manager_service>& sst_man, std::filesystem::path schema_table_data_path,
|
||||
std::function<schema_ptr()> schema_factory, reader_permit permit, std::string_view keyspace, std::vector<std::string_view> ck_strings) {
|
||||
|
||||
sharded<sstables::sstable_directory> sst_dirs;
|
||||
sst_dirs.start(
|
||||
sharded_parameter([&sst_man] { return std::ref(sst_man.local().sst_man); }),
|
||||
sharded_parameter([&schema_factory] { return schema_factory(); }),
|
||||
sharded_parameter([] { return make_lw_shared<const data_dictionary::storage_options>(); }),
|
||||
schema_table_data_path,
|
||||
sharded_parameter([] { return default_priority_class(); }),
|
||||
sharded_parameter([] { return default_io_error_handler_gen(); })).get();
|
||||
auto stop_sst_dirs = deferred_stop(sst_dirs);
|
||||
|
||||
auto sstable_open_infos = sst_dirs.map_reduce0(
|
||||
[] (sstables::sstable_directory& sst_dir) -> future<std::vector<sstables::foreign_sstable_open_info>> {
|
||||
co_await sst_dir.process_sstable_dir(sstables::sstable_directory::process_flags{ .sort_sstables_according_to_owner = false });
|
||||
const auto& unsorted_ssts = sst_dir.get_unsorted_sstables();
|
||||
std::vector<sstables::foreign_sstable_open_info> open_infos;
|
||||
open_infos.reserve(unsorted_ssts.size());
|
||||
for (auto& sst : unsorted_ssts) {
|
||||
open_infos.push_back(co_await sst->get_open_info());
|
||||
}
|
||||
co_return open_infos;
|
||||
},
|
||||
std::vector<sstables::foreign_sstable_open_info>{},
|
||||
[] (std::vector<sstables::foreign_sstable_open_info> a, std::vector<sstables::foreign_sstable_open_info> b) {
|
||||
std::move(b.begin(), b.end(), std::back_inserter(a));
|
||||
return a;
|
||||
}).get();
|
||||
|
||||
auto schema_table_schema = schema_factory();
|
||||
|
||||
if (sstable_open_infos.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(sstable_open_infos.size());
|
||||
for (auto& open_info : sstable_open_infos) {
|
||||
sstables.push_back(sst_dirs.local().load_foreign_sstable(open_info).get());
|
||||
}
|
||||
|
||||
auto pk = partition_key::from_deeply_exploded(*schema_table_schema, {data_value(keyspace)});
|
||||
auto dk = dht::decorate_key(*schema_table_schema, pk);
|
||||
auto pr = dht::partition_range::make_singular(dk);
|
||||
|
||||
std::vector<data_value> raw_ck_values;
|
||||
raw_ck_values.reserve(ck_strings.size());
|
||||
for (const auto& ck_str : ck_strings) {
|
||||
raw_ck_values.push_back(data_value(ck_str));
|
||||
}
|
||||
auto ck = clustering_key::from_deeply_exploded(*schema_table_schema, raw_ck_values);
|
||||
auto cr = query::clustering_range::make({ck, true}, {ck, true});
|
||||
auto ps = partition_slice_builder(*schema_table_schema)
|
||||
.with_range(cr)
|
||||
.build();
|
||||
|
||||
std::vector<flat_mutation_reader_v2> readers;
|
||||
readers.reserve(sstables.size());
|
||||
for (const auto& sst : sstables) {
|
||||
readers.emplace_back(sst->make_reader(schema_table_schema, permit, pr, ps));
|
||||
}
|
||||
auto reader = make_combined_reader(schema_table_schema, permit, std::move(readers));
|
||||
|
||||
return read_mutation_from_flat_mutation_reader(reader).get();
|
||||
}
|
||||
|
||||
class single_keyspace_user_types_storage : public data_dictionary::user_types_storage {
|
||||
data_dictionary::user_types_metadata _utm;
|
||||
public:
|
||||
single_keyspace_user_types_storage(data_dictionary::user_types_metadata utm) : _utm(std::move(utm)) { }
|
||||
virtual const data_dictionary::user_types_metadata& get(const sstring& ks) const override {
|
||||
return _utm;
|
||||
}
|
||||
};
|
||||
|
||||
std::unordered_map<schema_ptr, std::string> get_schema_table_directories(std::filesystem::path scylla_data_path) {
|
||||
const std::vector<schema_ptr> schemas{
|
||||
db::schema_tables::types(),
|
||||
db::schema_tables::tables(),
|
||||
db::schema_tables::columns(),
|
||||
db::schema_tables::view_virtual_columns(),
|
||||
db::schema_tables::computed_columns(),
|
||||
db::schema_tables::indexes(),
|
||||
db::schema_tables::dropped_columns(),
|
||||
db::schema_tables::scylla_tables()};
|
||||
|
||||
std::unordered_map<schema_ptr, std::string> schema_table_table_dir;
|
||||
|
||||
auto schema_tables_path = scylla_data_path / db::schema_tables::NAME;
|
||||
auto schema_tables_dir = open_directory(schema_tables_path.native()).get();
|
||||
|
||||
schema_tables_dir.list_directory([&] (directory_entry de) -> future<> {
|
||||
auto dash_pos = de.name.find_last_of('-');
|
||||
auto table_name = de.name.substr(0, dash_pos);
|
||||
|
||||
auto it = boost::find_if(schemas, [&] (const schema_ptr& s) {
|
||||
return s->cf_name() == table_name;
|
||||
});
|
||||
|
||||
if (it != schemas.end()) {
|
||||
if (!de.type) {
|
||||
throw std::runtime_error(fmt::format("failed loading schema tables from {}: keyspace directory entry {} has unrecognized type", scylla_data_path.native(), de.name));
|
||||
} else if (*de.type != directory_entry_type::directory) {
|
||||
throw std::runtime_error(fmt::format("failed loading schema tables from {}: keyspace directory entry {} has unrecognized type {}", scylla_data_path.native(), de.name, static_cast<int>(*de.type)));
|
||||
}
|
||||
auto s = *it;
|
||||
schema_table_table_dir[s] = de.name;
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).done().get();
|
||||
|
||||
if (schema_table_table_dir.size() != schemas.size()) {
|
||||
throw std::runtime_error(fmt::format("failed loading schema tables from {}: couldn't find table directory for all require schema tables", scylla_data_path.native()));
|
||||
}
|
||||
|
||||
return schema_table_table_dir;
|
||||
}
|
||||
|
||||
schema_ptr do_load_schema_from_schema_tables(std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) {
|
||||
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__);
|
||||
auto stop_semaphore = deferred_stop(rcs_sem);
|
||||
|
||||
sharded<sstable_manager_service> sst_man;
|
||||
sst_man.start().get();
|
||||
auto stop_sst_man_service = deferred_stop(sst_man);
|
||||
|
||||
auto schema_table_table_dir = get_schema_table_directories(scylla_data_path);
|
||||
auto schema_tables_path = scylla_data_path / db::schema_tables::NAME;
|
||||
|
||||
auto do_load = [&] (std::function<const schema_ptr()> schema_factory) {
|
||||
auto s = schema_factory();
|
||||
return read_schema_table_mutation(
|
||||
sst_man,
|
||||
schema_tables_path / schema_table_table_dir[s],
|
||||
schema_factory,
|
||||
rcs_sem.make_tracking_only_permit(s.get(), "schema_mutation", db::no_timeout, {}),
|
||||
keyspace,
|
||||
{table});
|
||||
};
|
||||
mutation_opt tables = do_load(db::schema_tables::tables);
|
||||
mutation_opt columns = do_load(db::schema_tables::columns);
|
||||
mutation_opt view_virtual_columns = do_load(db::schema_tables::view_virtual_columns);
|
||||
mutation_opt computed_columns = do_load(db::schema_tables::computed_columns);
|
||||
mutation_opt indexes = do_load(db::schema_tables::indexes);
|
||||
mutation_opt dropped_columns = do_load(db::schema_tables::dropped_columns);
|
||||
mutation_opt scylla_tables = do_load([] () { return db::schema_tables::scylla_tables(); });
|
||||
|
||||
if (!tables || !columns) {
|
||||
throw std::runtime_error(fmt::format("Failed to find {}.{} in 'tables' and/or 'columns' schema tables", keyspace, table));
|
||||
}
|
||||
|
||||
data_dictionary::user_types_metadata utm;
|
||||
|
||||
auto types_mut = read_schema_table_mutation(
|
||||
sst_man,
|
||||
schema_tables_path / schema_table_table_dir[db::schema_tables::types()],
|
||||
db::schema_tables::types,
|
||||
rcs_sem.make_tracking_only_permit(db::schema_tables::types().get(), "types_mutation", db::no_timeout, {}),
|
||||
keyspace,
|
||||
{});
|
||||
if (types_mut) {
|
||||
query::result_set result(*types_mut);
|
||||
|
||||
auto ks = make_lw_shared<keyspace_metadata>(keyspace, "org.apache.cassandra.locator.LocalStrategy", std::map<sstring, sstring>{}, false);
|
||||
db::cql_type_parser::raw_builder ut_builder(*ks);
|
||||
|
||||
auto get_list = [] (const query::result_set_row& row, const char* name) {
|
||||
return boost::copy_range<std::vector<sstring>>(
|
||||
row.get_nonnull<const list_type_impl::native_type&>(name)
|
||||
| boost::adaptors::transformed([] (const data_value& v) { return value_cast<sstring>(v); }));
|
||||
};
|
||||
|
||||
for (const auto& row : result.rows()) {
|
||||
const auto name = row.get_nonnull<sstring>("type_name");
|
||||
const auto field_names = get_list(row, "field_names");
|
||||
const auto field_types = get_list(row, "field_types");
|
||||
ut_builder.add(name, field_names, field_types);
|
||||
}
|
||||
|
||||
for (auto&& ut : ut_builder.build()) {
|
||||
utm.add_type(std::move(ut));
|
||||
}
|
||||
}
|
||||
|
||||
db::config dbcfg;
|
||||
auto user_type_storage = std::make_shared<single_keyspace_user_types_storage>(std::move(utm));
|
||||
db::schema_ctxt ctxt(dbcfg, user_type_storage);
|
||||
schema_mutations muts(std::move(*tables), std::move(*columns), std::move(view_virtual_columns), std::move(computed_columns), std::move(indexes),
|
||||
std::move(dropped_columns), std::move(scylla_tables));
|
||||
return db::schema_tables::create_table_from_mutations(ctxt, muts);
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
namespace tools {
|
||||
@@ -362,4 +579,10 @@ schema_ptr load_system_schema(std::string_view keyspace, std::string_view table)
|
||||
return *tb_it;
|
||||
}
|
||||
|
||||
future<schema_ptr> load_schema_from_schema_tables(std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) {
|
||||
return async([=] () mutable {
|
||||
return do_load_schema_from_schema_tables(scylla_data_path, keyspace, table);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace tools
|
||||
|
||||
@@ -49,4 +49,13 @@ future<schema_ptr> load_one_schema_from_file(std::filesystem::path path);
|
||||
/// all schema and experimental features enabled.
|
||||
schema_ptr load_system_schema(std::string_view keyspace, std::string_view table);
|
||||
|
||||
/// Load the schema of the table with the designated keyspace and table name,
|
||||
/// from the system schema table sstables.
|
||||
///
|
||||
/// The schema table sstables are accessed for read only. In general this method
|
||||
/// tries very hard to have no side-effects.
|
||||
/// The \p scylla_data_path parameter is expected to point to the scylla data
|
||||
/// directory, which is usually /var/lib/scylla/data.
|
||||
future<schema_ptr> load_schema_from_schema_tables(std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table);
|
||||
|
||||
} // namespace tools
|
||||
|
||||
Reference in New Issue
Block a user