diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 18ab981449..c5e376b186 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -626,6 +626,10 @@ public: return _highest_generation; } + using allow_offstrategy_compaction = bool_class; + using must_exist = bool_class; + future<> populate_column_family(sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); + private: future<> start_subdir(sstring subdir); }; @@ -684,7 +688,8 @@ sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstab return table.get_sstables_manager().make_sstable(table.schema(), dir.native(), generation, v, sstables::sstable_format_types::big); } -future<> distributed_loader::populate_column_family(table_population_metadata& metadata, sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { +future<> table_population_metadata::populate_column_family(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { + auto& metadata = *this; auto& db = metadata.db(); const auto& ks = metadata.ks(); const auto& cf = metadata.cf(); @@ -707,7 +712,7 @@ future<> distributed_loader::populate_column_family(table_population_metadata& m auto& directory = *metadata.sstable_directories().at(subdir); auto sst_version = metadata.highest_version(); - co_await reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable { + co_await distributed_loader::reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable { auto gen = smp::submit_to(shard, [&global_table] () { return global_table->calculate_generation_for_new_table(); }).get0(); @@ -727,7 +732,7 @@ future<> distributed_loader::populate_column_family(table_population_metadata& m return sst->get_origin() != sstables::repair_origin; }; - co_await reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) { + co_await distributed_loader::reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) { auto gen = global_table->calculate_generation_for_new_table(); return make_sstable(*global_table, sstdir, gen, sst_version); }, eligible_for_reshape_on_boot, default_priority_class()); @@ -770,9 +775,9 @@ future<> distributed_loader::populate_keyspace(distributed& d co_await ks.make_directory_for_column_family(cfname, uuid); co_await metadata.start(); - co_await distributed_loader::populate_column_family(metadata, sstables::staging_dir, allow_offstrategy_compaction::no); - co_await distributed_loader::populate_column_family(metadata, sstables::quarantine_dir, allow_offstrategy_compaction::no, must_exist::no); - co_await distributed_loader::populate_column_family(metadata, "", allow_offstrategy_compaction::yes); + co_await metadata.populate_column_family(sstables::staging_dir, table_population_metadata::allow_offstrategy_compaction::no); + co_await metadata.populate_column_family(sstables::quarantine_dir, table_population_metadata::allow_offstrategy_compaction::no, table_population_metadata::must_exist::no); + co_await metadata.populate_column_family("", table_population_metadata::allow_offstrategy_compaction::yes); } catch (...) { std::exception_ptr eptr = std::current_exception(); std::string msg = diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index f3904ac945..d47b1d5646 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -75,9 +75,6 @@ class distributed_loader { static future make_sstables_available(sstables::sstable_directory& dir, sharded& db, sharded& view_update_generator, std::filesystem::path datadir, sstring ks, sstring cf); - using allow_offstrategy_compaction = bool_class; - using must_exist = bool_class; - static future<> populate_column_family(table_population_metadata& metadata, sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir); static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);