Merge 'Rewrap keyspace population loop' from Pavel Emelyanov
Populating of non-system keyspaces is now done by listing datadirs and assuming that each subdir found is a keyspace. For S3-backed keyspaces this is also true, but it's a bug (#13020). The loop needs to walk the list of known keyspaces instead, and try to find the keyspace storage later, based on the storage option. Closes scylladb/scylladb#15436 * github.com:scylladb/scylladb: distributed_loader: Indentation fix after previous patch distributed_loader: Generalize datadir parallelizm loop distributed_loader: Provide keyspace ref to populate_keyspace distributed_loader: Walk list of keyspaces instead of directories
This commit is contained in:
@@ -459,17 +459,18 @@ future<> table_populator::populate_subdir(sstables::sstable_state state, allow_o
|
||||
});
|
||||
}
|
||||
|
||||
future<> distributed_loader::populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name) {
|
||||
future<> distributed_loader::populate_keyspace(distributed<replica::database>& db, keyspace& ks, sstring ks_name) {
|
||||
// might have more than one dir for a keyspace iff data_file_directories is > 1 and
|
||||
// somehow someone placed sstables in more than one of them for a given ks. (import?)
|
||||
return parallel_for_each(db.local().get_config().data_file_directories(), [&db, &ks, ks_name] (const sstring& data_dir) {
|
||||
return populate_keyspace(db, ks, data_dir, ks_name);
|
||||
});
|
||||
}
|
||||
|
||||
future<> distributed_loader::populate_keyspace(distributed<replica::database>& db, keyspace& ks, sstring datadir, sstring ks_name) {
|
||||
auto ksdir = datadir + "/" + ks_name;
|
||||
auto& keyspaces = db.local().get_keyspaces();
|
||||
auto i = keyspaces.find(ks_name);
|
||||
if (i == keyspaces.end()) {
|
||||
dblog.warn("Skipping undefined keyspace: {}", ks_name);
|
||||
co_return;
|
||||
}
|
||||
|
||||
dblog.info("Populating Keyspace {}", ks_name);
|
||||
auto& ks = i->second;
|
||||
auto& tables_metadata = db.local().get_tables_metadata();
|
||||
|
||||
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values, [&] (schema_ptr s) -> future<> {
|
||||
@@ -518,10 +519,11 @@ future<> distributed_loader::init_system_keyspace(sharded<db::system_keyspace>&
|
||||
return sys_ks.make(erm_factory.local(), db.local());
|
||||
}).get();
|
||||
|
||||
const auto& cfg = db.local().get_config();
|
||||
for (auto& data_dir : cfg.data_file_directories()) {
|
||||
for (auto ksname : system_keyspaces) {
|
||||
distributed_loader::populate_keyspace(db, data_dir, sstring(ksname)).get();
|
||||
for (auto ksname : system_keyspaces) {
|
||||
auto& ks = db.local().get_keyspaces();
|
||||
auto i = ks.find(ksname);
|
||||
if (i != ks.end()) {
|
||||
distributed_loader::populate_keyspace(db, i->second, sstring(ksname)).get();
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -535,30 +537,16 @@ future<> distributed_loader::init_non_system_keyspaces(distributed<replica::data
|
||||
}).get();
|
||||
|
||||
const auto& cfg = db.local().get_config();
|
||||
using ks_dirs = std::unordered_multimap<sstring, sstring>;
|
||||
|
||||
ks_dirs dirs;
|
||||
|
||||
parallel_for_each(cfg.data_file_directories(), [&dirs] (sstring directory) {
|
||||
// we want to collect the directories first, so we can get a full set of potential dirs
|
||||
return lister::scan_dir(fs::path{directory}, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[&dirs] (fs::path datadir, directory_entry de) {
|
||||
if (!is_system_keyspace(de.name)) {
|
||||
dirs.emplace(de.name, datadir.native());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
|
||||
|
||||
for (bool prio_only : { true, false}) {
|
||||
std::vector<future<>> futures;
|
||||
|
||||
// treat "dirs" as immutable to avoid modifying it while still in
|
||||
// a range-iteration. Also to simplify the "finally"
|
||||
for (auto i = dirs.begin(); i != dirs.end();) {
|
||||
auto& ks_name = i->first;
|
||||
auto j = i++;
|
||||
for (auto& ks : db.local().get_keyspaces()) {
|
||||
auto& ks_name = ks.first;
|
||||
|
||||
if (is_system_keyspace(ks_name)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Must process in two phases: Prio and non-prio.
|
||||
@@ -570,13 +558,7 @@ future<> distributed_loader::init_non_system_keyspaces(distributed<replica::data
|
||||
continue;
|
||||
}
|
||||
|
||||
auto e = dirs.equal_range(ks_name).second;
|
||||
// might have more than one dir for a keyspace iff data_file_directories is > 1 and
|
||||
// somehow someone placed sstables in more than one of them for a given ks. (import?)
|
||||
futures.emplace_back(parallel_for_each(j, e, [&](const std::pair<sstring, sstring>& p) {
|
||||
auto& datadir = p.second;
|
||||
return distributed_loader::populate_keyspace(db, datadir, ks_name);
|
||||
}));
|
||||
futures.emplace_back(distributed_loader::populate_keyspace(db, ks.second, ks_name));
|
||||
}
|
||||
|
||||
when_all_succeed(futures.begin(), futures.end()).discard_result().get();
|
||||
|
||||
@@ -73,7 +73,8 @@ class distributed_loader {
|
||||
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
|
||||
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
|
||||
bool needs_view_update, sstring ks, sstring cf);
|
||||
static future<> populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name);
|
||||
static future<> populate_keyspace(distributed<replica::database>& db, keyspace& ks, sstring ks_name);
|
||||
static future<> populate_keyspace(distributed<replica::database>& db, keyspace& ks, sstring datadir, sstring ks_name);
|
||||
|
||||
public:
|
||||
static future<> init_system_keyspace(sharded<db::system_keyspace>&, distributed<locator::effective_replication_map_factory>&, distributed<replica::database>&);
|
||||
|
||||
Reference in New Issue
Block a user