replica: distributed_loader: coroutinize make_sstables_available
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -8,6 +8,7 @@
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include "distributed_loader.hh"
|
||||
#include "replica/database.hh"
|
||||
@@ -258,43 +259,32 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
|
||||
sharded<db::view::view_update_generator>& view_update_generator, fs::path datadir, sstring ks, sstring cf) {
|
||||
|
||||
auto& table = db.local().find_column_family(ks, cf);
|
||||
auto new_sstables = std::vector<sstables::shared_sstable>();
|
||||
|
||||
return do_with(std::vector<sstables::shared_sstable>(),
|
||||
[&table, &dir, &view_update_generator, datadir = std::move(datadir)] (std::vector<sstables::shared_sstable>& new_sstables) {
|
||||
return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) {
|
||||
// FIXME indentation
|
||||
co_await dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) -> future<> {
|
||||
auto gen = table.calculate_generation_for_new_table();
|
||||
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen);
|
||||
return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, &new_sstables, sst] {
|
||||
co_await sst->move_to_new_dir(datadir.native(), gen, true);
|
||||
// When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels.
|
||||
sst->set_sstable_level(0);
|
||||
new_sstables.push_back(std::move(sst));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([&table, &new_sstables] {
|
||||
});
|
||||
// nothing loaded
|
||||
if (new_sstables.empty()) {
|
||||
return make_ready_future<>();
|
||||
co_return 0;
|
||||
}
|
||||
|
||||
return table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) {
|
||||
co_await table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) {
|
||||
dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep);
|
||||
abort();
|
||||
});
|
||||
}).then([&view_update_generator, &table, &new_sstables] {
|
||||
return parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable& sst) {
|
||||
co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst->requires_view_building()) {
|
||||
return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
|
||||
co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then_wrapped([&new_sstables] (future<> f) {
|
||||
if (!f.failed()) {
|
||||
return make_ready_future<size_t>(new_sstables.size());
|
||||
} else {
|
||||
return make_exception_future<size_t>(f.get_exception());
|
||||
}
|
||||
});
|
||||
});
|
||||
co_return new_sstables.size();
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
Reference in New Issue
Block a user