Merge 'Rewrap keyspace population loop' from Pavel Emelyanov

Populating of non-system keyspaces is now done by listing datadirs and assuming that each subdir found is a keyspace. For S3-backed keyspaces this is also true, but it's a bug (#13020). The loop needs to walk the list of known keyspaces instead, and try to find the keyspace storage later, based on the storage option.

Closes scylladb/scylladb#15436

* github.com:scylladb/scylladb:
  distributed_loader: Indentation fix after previous patch
  distributed_loader: Generalize datadir parallelizm loop
  distributed_loader: Provide keyspace ref to populate_keyspace
  distributed_loader: Walk list of keyspaces instead of directories
This commit is contained in:
Avi Kivity
2023-09-18 20:51:01 +03:00
2 changed files with 23 additions and 40 deletions

View File

@@ -459,17 +459,18 @@ future<> table_populator::populate_subdir(sstables::sstable_state state, allow_o
});
}
future<> distributed_loader::populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name) {
future<> distributed_loader::populate_keyspace(distributed<replica::database>& db, keyspace& ks, sstring ks_name) {
// might have more than one dir for a keyspace iff data_file_directories is > 1 and
// somehow someone placed sstables in more than one of them for a given ks. (import?)
return parallel_for_each(db.local().get_config().data_file_directories(), [&db, &ks, ks_name] (const sstring& data_dir) {
return populate_keyspace(db, ks, data_dir, ks_name);
});
}
future<> distributed_loader::populate_keyspace(distributed<replica::database>& db, keyspace& ks, sstring datadir, sstring ks_name) {
auto ksdir = datadir + "/" + ks_name;
auto& keyspaces = db.local().get_keyspaces();
auto i = keyspaces.find(ks_name);
if (i == keyspaces.end()) {
dblog.warn("Skipping undefined keyspace: {}", ks_name);
co_return;
}
dblog.info("Populating Keyspace {}", ks_name);
auto& ks = i->second;
auto& tables_metadata = db.local().get_tables_metadata();
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values, [&] (schema_ptr s) -> future<> {
@@ -518,10 +519,11 @@ future<> distributed_loader::init_system_keyspace(sharded<db::system_keyspace>&
return sys_ks.make(erm_factory.local(), db.local());
}).get();
const auto& cfg = db.local().get_config();
for (auto& data_dir : cfg.data_file_directories()) {
for (auto ksname : system_keyspaces) {
distributed_loader::populate_keyspace(db, data_dir, sstring(ksname)).get();
for (auto ksname : system_keyspaces) {
auto& ks = db.local().get_keyspaces();
auto i = ks.find(ksname);
if (i != ks.end()) {
distributed_loader::populate_keyspace(db, i->second, sstring(ksname)).get();
}
}
});
@@ -535,30 +537,16 @@ future<> distributed_loader::init_non_system_keyspaces(distributed<replica::data
}).get();
const auto& cfg = db.local().get_config();
using ks_dirs = std::unordered_multimap<sstring, sstring>;
ks_dirs dirs;
parallel_for_each(cfg.data_file_directories(), [&dirs] (sstring directory) {
// we want to collect the directories first, so we can get a full set of potential dirs
return lister::scan_dir(fs::path{directory}, lister::dir_entry_types::of<directory_entry_type::directory>(),
[&dirs] (fs::path datadir, directory_entry de) {
if (!is_system_keyspace(de.name)) {
dirs.emplace(de.name, datadir.native());
}
return make_ready_future<>();
});
}).get();
for (bool prio_only : { true, false}) {
std::vector<future<>> futures;
// treat "dirs" as immutable to avoid modifying it while still in
// a range-iteration. Also to simplify the "finally"
for (auto i = dirs.begin(); i != dirs.end();) {
auto& ks_name = i->first;
auto j = i++;
for (auto& ks : db.local().get_keyspaces()) {
auto& ks_name = ks.first;
if (is_system_keyspace(ks_name)) {
continue;
}
/**
* Must process in two phases: Prio and non-prio.
@@ -570,13 +558,7 @@ future<> distributed_loader::init_non_system_keyspaces(distributed<replica::data
continue;
}
auto e = dirs.equal_range(ks_name).second;
// might have more than one dir for a keyspace iff data_file_directories is > 1 and
// somehow someone placed sstables in more than one of them for a given ks. (import?)
futures.emplace_back(parallel_for_each(j, e, [&](const std::pair<sstring, sstring>& p) {
auto& datadir = p.second;
return distributed_loader::populate_keyspace(db, datadir, ks_name);
}));
futures.emplace_back(distributed_loader::populate_keyspace(db, ks.second, ks_name));
}
when_all_succeed(futures.begin(), futures.end()).discard_result().get();

View File

@@ -73,7 +73,8 @@ class distributed_loader {
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
bool needs_view_update, sstring ks, sstring cf);
static future<> populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name);
static future<> populate_keyspace(distributed<replica::database>& db, keyspace& ks, sstring ks_name);
static future<> populate_keyspace(distributed<replica::database>& db, keyspace& ks, sstring datadir, sstring ks_name);
public:
static future<> init_system_keyspace(sharded<db::system_keyspace>&, distributed<locator::effective_replication_map_factory>&, distributed<replica::database>&);