replica: distributed_loader: coroutinize make_sstables_available

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-05-19 12:50:23 +03:00
parent 868cea21e0
commit b3ebbf35e2

View File

@@ -8,6 +8,7 @@
#include <seastar/core/coroutine.hh> #include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh> #include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/closeable.hh> #include <seastar/util/closeable.hh>
#include "distributed_loader.hh" #include "distributed_loader.hh"
#include "replica/database.hh" #include "replica/database.hh"
@@ -258,43 +259,32 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
sharded<db::view::view_update_generator>& view_update_generator, fs::path datadir, sstring ks, sstring cf) { sharded<db::view::view_update_generator>& view_update_generator, fs::path datadir, sstring ks, sstring cf) {
auto& table = db.local().find_column_family(ks, cf); auto& table = db.local().find_column_family(ks, cf);
auto new_sstables = std::vector<sstables::shared_sstable>();
return do_with(std::vector<sstables::shared_sstable>(), // FIXME indentation
[&table, &dir, &view_update_generator, datadir = std::move(datadir)] (std::vector<sstables::shared_sstable>& new_sstables) { co_await dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) -> future<> {
return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) {
auto gen = table.calculate_generation_for_new_table(); auto gen = table.calculate_generation_for_new_table();
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen); dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen);
return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, &new_sstables, sst] { co_await sst->move_to_new_dir(datadir.native(), gen, true);
// When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels. // When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels.
sst->set_sstable_level(0); sst->set_sstable_level(0);
new_sstables.push_back(std::move(sst)); new_sstables.push_back(std::move(sst));
return make_ready_future<>(); });
});
}).then([&table, &new_sstables] {
// nothing loaded // nothing loaded
if (new_sstables.empty()) { if (new_sstables.empty()) {
return make_ready_future<>(); co_return 0;
} }
return table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) { co_await table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) {
dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep); dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep);
abort(); abort();
}); });
}).then([&view_update_generator, &table, &new_sstables] { co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> {
return parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable& sst) {
if (sst->requires_view_building()) { if (sst->requires_view_building()) {
return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this()); co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
} }
return make_ready_future<>();
}); });
}).then_wrapped([&new_sstables] (future<> f) { co_return new_sstables.size();
if (!f.failed()) {
return make_ready_future<size_t>(new_sstables.size());
} else {
return make_exception_future<size_t>(f.get_exception());
}
});
});
} }
future<> future<>