diff --git a/replica/database.hh b/replica/database.hh index aa74f8b272..bd273d5527 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -274,7 +274,7 @@ using sstable_list = sstables::sstable_list; namespace replica { class distributed_loader; -struct table_population_metadata; +class table_populator; // The CF has a "stats" structure. But we don't want all fields here, // since some of them are fairly complex for exporting to collectd. Also, @@ -1094,7 +1094,7 @@ public: friend class ::column_family_test; friend class distributed_loader; - friend class table_population_metadata; + friend class table_populator; private: timer<> _off_strategy_trigger; diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 18ab981449..d4e9c64634 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -547,7 +547,7 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele co_await when_all_succeed(futures.begin(), futures.end()).discard_result(); } -class table_population_metadata { +class table_populator { distributed& _db; sstring _ks; sstring _cf; @@ -558,7 +558,7 @@ class table_population_metadata { sstables::generation_type _highest_generation = sstables::generation_from_value(0); public: - table_population_metadata(distributed& db, sstring ks, sstring cf) + table_populator(distributed& db, sstring ks, sstring cf) : _db(db) , _ks(std::move(ks)) , _cf(std::move(cf)) @@ -566,9 +566,9 @@ public: , _base_path(_global_table->dir()) {} - ~table_population_metadata() { + ~table_populator() { // All directories must have been stopped - // using table_population_metadata::stop() + // using table_populator::stop() assert(_sstable_directories.empty()); } @@ -582,6 +582,10 @@ public: _global_table->update_sstables_known_generation(_highest_generation); return _global_table->disable_auto_compaction(); }); + + co_await populate_subdir(sstables::staging_dir, allow_offstrategy_compaction::no); + co_await populate_subdir(sstables::quarantine_dir, allow_offstrategy_compaction::no, must_exist::no); + co_await populate_subdir("", allow_offstrategy_compaction::yes); } future<> stop() { @@ -590,47 +594,19 @@ public: } } +private: fs::path get_path(std::string_view subdir) { return subdir.empty() ? _base_path : _base_path / subdir; } - distributed& db() noexcept { - return _db; - } + using allow_offstrategy_compaction = bool_class; + using must_exist = bool_class; + future<> populate_subdir(sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); - const sstring& ks() const noexcept { - return _ks; - } - - const sstring& cf() const noexcept { - return _cf; - } - - global_column_family_ptr& global_table() noexcept { - return _global_table; - }; - - const global_column_family_ptr& global_table() const noexcept { - return _global_table; - }; - - const std::unordered_map>>& sstable_directories() const noexcept { - return _sstable_directories; - } - - sstables::sstable::version_types highest_version() const noexcept { - return _highest_version; - } - - sstables::generation_type highest_generation() const noexcept { - return _highest_generation; - } - -private: future<> start_subdir(sstring subdir); }; -future<> table_population_metadata::start_subdir(sstring subdir) { +future<> table_populator::start_subdir(sstring subdir) { sstring sstdir = get_path(subdir).native(); if (!co_await file_exists(sstdir)) { co_return; @@ -655,7 +631,7 @@ future<> table_population_metadata::start_subdir(sstring subdir) { default_io_error_handler_gen() ); - // directory must be stopped using table_population_metadata::stop below + // directory must be stopped using table_populator::stop below _sstable_directories[subdir] = dptr; co_await distributed_loader::lock_table(directory, _db, _ks, _cf); @@ -684,35 +660,25 @@ sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstab return table.get_sstables_manager().make_sstable(table.schema(), dir.native(), generation, v, sstables::sstable_format_types::big); } -future<> distributed_loader::populate_column_family(table_population_metadata& metadata, sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { - auto& db = metadata.db(); - const auto& ks = metadata.ks(); - const auto& cf = metadata.cf(); - auto sstdir = metadata.get_path(subdir); - dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", ks, cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist); +future<> table_populator::populate_subdir(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { + auto sstdir = get_path(subdir); + dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", _ks, _cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist); - assert(this_shard_id() == 0); - - if (!co_await file_exists(sstdir.native())) { + if (!_sstable_directories.contains(subdir)) { if (dir_must_exist) { - throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", metadata.ks(), metadata.cf(), sstdir)); + throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", _ks, _cf, sstdir)); } co_return; } - auto& global_table = metadata.global_table(); - if (!metadata.sstable_directories().contains(subdir)) { - dblog.error("Could not find sstables directory {}.{}/{}", ks, cf, subdir); - } - auto& directory = *metadata.sstable_directories().at(subdir); - auto sst_version = metadata.highest_version(); + auto& directory = *_sstable_directories.at(subdir); - co_await reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable { - auto gen = smp::submit_to(shard, [&global_table] () { - return global_table->calculate_generation_for_new_table(); + co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, sstdir] (shard_id shard) mutable { + auto gen = smp::submit_to(shard, [this] () { + return _global_table->calculate_generation_for_new_table(); }).get0(); - return make_sstable(*global_table, sstdir, gen, sst_version); + return make_sstable(*_global_table, sstdir, gen, _highest_version); }, default_priority_class()); // The node is offline at this point so we are very lenient with what we consider @@ -727,18 +693,18 @@ future<> distributed_loader::populate_column_family(table_population_metadata& m return sst->get_origin() != sstables::repair_origin; }; - co_await reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) { - auto gen = global_table->calculate_generation_for_new_table(); - return make_sstable(*global_table, sstdir, gen, sst_version); + co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, sstdir] (shard_id shard) { + auto gen = _global_table->calculate_generation_for_new_table(); + return make_sstable(*_global_table, sstdir, gen, _highest_version); }, eligible_for_reshape_on_boot, default_priority_class()); - co_await directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> { - co_await dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) { + co_await directory.invoke_on_all([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> { + co_await dir.do_for_each_sstable([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) { auto requires_offstrategy = sstables::offstrategy(do_allow_offstrategy_compaction && !eligible_for_reshape_on_boot(sst)); - return global_table->add_sstable_and_update_cache(sst, requires_offstrategy); + return _global_table->add_sstable_and_update_cache(sst, requires_offstrategy); }); if (do_allow_offstrategy_compaction) { - global_table->trigger_offstrategy_compaction(); + _global_table->trigger_offstrategy_compaction(); } }); } @@ -763,16 +729,13 @@ future<> distributed_loader::populate_keyspace(distributed& d auto sstdir = ks.column_family_directory(ksdir, cfname, uuid); dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version()); - auto metadata = table_population_metadata(db, ks_name, cfname); + auto metadata = table_populator(db, ks_name, cfname); std::exception_ptr ex; try { co_await ks.make_directory_for_column_family(cfname, uuid); co_await metadata.start(); - co_await distributed_loader::populate_column_family(metadata, sstables::staging_dir, allow_offstrategy_compaction::no); - co_await distributed_loader::populate_column_family(metadata, sstables::quarantine_dir, allow_offstrategy_compaction::no, must_exist::no); - co_await distributed_loader::populate_column_family(metadata, "", allow_offstrategy_compaction::yes); } catch (...) { std::exception_ptr eptr = std::current_exception(); std::string msg = diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index f3904ac945..8529cf2176 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -60,11 +60,11 @@ class distributed_loader_for_tests; namespace replica { -class table_population_metadata; +class table_populator; class distributed_loader { friend class ::distributed_loader_for_tests; - friend class table_population_metadata; + friend class table_populator; static future<> reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, std::function filter, io_priority_class iop); @@ -75,9 +75,6 @@ class distributed_loader { static future make_sstables_available(sstables::sstable_directory& dir, sharded& db, sharded& view_update_generator, std::filesystem::path datadir, sstring ks, sstring cf); - using allow_offstrategy_compaction = bool_class; - using must_exist = bool_class; - static future<> populate_column_family(table_population_metadata& metadata, sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir); static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);