replica: distributed_loader: coroutinize make_sstables_available

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-05-19 12:50:23 +03:00
parent 868cea21e0
commit b3ebbf35e2

View File

@@ -8,6 +8,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/closeable.hh>
#include "distributed_loader.hh"
#include "replica/database.hh"
@@ -258,43 +259,32 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
sharded<db::view::view_update_generator>& view_update_generator, fs::path datadir, sstring ks, sstring cf) {
auto& table = db.local().find_column_family(ks, cf);
auto new_sstables = std::vector<sstables::shared_sstable>();
return do_with(std::vector<sstables::shared_sstable>(),
[&table, &dir, &view_update_generator, datadir = std::move(datadir)] (std::vector<sstables::shared_sstable>& new_sstables) {
return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) {
// FIXME indentation
co_await dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) -> future<> {
auto gen = table.calculate_generation_for_new_table();
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen);
return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, &new_sstables, sst] {
co_await sst->move_to_new_dir(datadir.native(), gen, true);
// When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels.
sst->set_sstable_level(0);
new_sstables.push_back(std::move(sst));
return make_ready_future<>();
});
}).then([&table, &new_sstables] {
});
// nothing loaded
if (new_sstables.empty()) {
return make_ready_future<>();
co_return 0;
}
return table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) {
co_await table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) {
dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep);
abort();
});
}).then([&view_update_generator, &table, &new_sstables] {
return parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable& sst) {
co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> {
if (sst->requires_view_building()) {
return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
}
return make_ready_future<>();
});
}).then_wrapped([&new_sstables] (future<> f) {
if (!f.failed()) {
return make_ready_future<size_t>(new_sstables.size());
} else {
return make_exception_future<size_t>(f.get_exception());
}
});
});
co_return new_sstables.size();
}
future<>