From b3ebbf35e2ef0c48d559b77fcd784f857aff6e88 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 19 May 2022 12:50:23 +0300 Subject: [PATCH] replica: distributed_loader: coroutinize make_sstables_available Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 87426e9e40..8873dfc65d 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -8,6 +8,7 @@ #include #include +#include #include #include "distributed_loader.hh" #include "replica/database.hh" @@ -258,43 +259,32 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh sharded& view_update_generator, fs::path datadir, sstring ks, sstring cf) { auto& table = db.local().find_column_family(ks, cf); + auto new_sstables = std::vector(); - return do_with(std::vector(), - [&table, &dir, &view_update_generator, datadir = std::move(datadir)] (std::vector& new_sstables) { - return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) { + // FIXME indentation + co_await dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) -> future<> { auto gen = table.calculate_generation_for_new_table(); dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen); - return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, &new_sstables, sst] { + co_await sst->move_to_new_dir(datadir.native(), gen, true); // When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels. sst->set_sstable_level(0); new_sstables.push_back(std::move(sst)); - return make_ready_future<>(); - }); - }).then([&table, &new_sstables] { + }); // nothing loaded if (new_sstables.empty()) { - return make_ready_future<>(); + co_return 0; } - return table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) { + co_await table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) { dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep); abort(); }); - }).then([&view_update_generator, &table, &new_sstables] { - return parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable& sst) { + co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> { if (sst->requires_view_building()) { - return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this()); + co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this()); } - return make_ready_future<>(); }); - }).then_wrapped([&new_sstables] (future<> f) { - if (!f.failed()) { - return make_ready_future(new_sstables.size()); - } else { - return make_exception_future(f.get_exception()); - } - }); - }); + co_return new_sstables.size(); } future<>