From 16fca3fa8a62a792fe038f2d239d98fc9c310902 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 14 Feb 2023 15:31:07 +0300 Subject: [PATCH 1/5] distributed_loader: Move populate_column_family() into population meta This ownership change also requires the auto& = *this alias and extra specification where to call reshard() and reshape() from. Signed-off-by: Pavel Emelyanov --- replica/distributed_loader.cc | 17 +++++++++++------ replica/distributed_loader.hh | 3 --- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 18ab981449..c5e376b186 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -626,6 +626,10 @@ public: return _highest_generation; } + using allow_offstrategy_compaction = bool_class; + using must_exist = bool_class; + future<> populate_column_family(sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); + private: future<> start_subdir(sstring subdir); }; @@ -684,7 +688,8 @@ sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstab return table.get_sstables_manager().make_sstable(table.schema(), dir.native(), generation, v, sstables::sstable_format_types::big); } -future<> distributed_loader::populate_column_family(table_population_metadata& metadata, sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { +future<> table_population_metadata::populate_column_family(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { + auto& metadata = *this; auto& db = metadata.db(); const auto& ks = metadata.ks(); const auto& cf = metadata.cf(); @@ -707,7 +712,7 @@ future<> distributed_loader::populate_column_family(table_population_metadata& m auto& directory = *metadata.sstable_directories().at(subdir); auto sst_version = metadata.highest_version(); - co_await reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable { + co_await distributed_loader::reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable { auto gen = smp::submit_to(shard, [&global_table] () { return global_table->calculate_generation_for_new_table(); }).get0(); @@ -727,7 +732,7 @@ future<> distributed_loader::populate_column_family(table_population_metadata& m return sst->get_origin() != sstables::repair_origin; }; - co_await reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) { + co_await distributed_loader::reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) { auto gen = global_table->calculate_generation_for_new_table(); return make_sstable(*global_table, sstdir, gen, sst_version); }, eligible_for_reshape_on_boot, default_priority_class()); @@ -770,9 +775,9 @@ future<> distributed_loader::populate_keyspace(distributed& d co_await ks.make_directory_for_column_family(cfname, uuid); co_await metadata.start(); - co_await distributed_loader::populate_column_family(metadata, sstables::staging_dir, allow_offstrategy_compaction::no); - co_await distributed_loader::populate_column_family(metadata, sstables::quarantine_dir, allow_offstrategy_compaction::no, must_exist::no); - co_await distributed_loader::populate_column_family(metadata, "", allow_offstrategy_compaction::yes); + co_await metadata.populate_column_family(sstables::staging_dir, table_population_metadata::allow_offstrategy_compaction::no); + co_await metadata.populate_column_family(sstables::quarantine_dir, table_population_metadata::allow_offstrategy_compaction::no, table_population_metadata::must_exist::no); + co_await metadata.populate_column_family("", table_population_metadata::allow_offstrategy_compaction::yes); } catch (...) { std::exception_ptr eptr = std::current_exception(); std::string msg = diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index f3904ac945..d47b1d5646 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -75,9 +75,6 @@ class distributed_loader { static future make_sstables_available(sstables::sstable_directory& dir, sharded& db, sharded& view_update_generator, std::filesystem::path datadir, sstring ks, sstring cf); - using allow_offstrategy_compaction = bool_class; - using must_exist = bool_class; - static future<> populate_column_family(table_population_metadata& metadata, sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir); static future<> handle_sstables_pending_delete(sstring pending_deletes_dir); From 123a82adb2b2c293a75bda06b21c88705c8710a1 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 14 Feb 2023 15:37:09 +0300 Subject: [PATCH 2/5] distributed_loader: Remove local aliases and exporters After previous patch all local alias references in populate_column_family() are no longer requires. Neither are the exporting calls from the table_population_metadata class. Some non-obvious change is capturing 'this' instead of 'global_table' on calls that are cross-shard. That's OK, table_population_metadata is not sharded<> and is designed for cross-shard usage too. Signed-off-by: Pavel Emelyanov --- replica/distributed_loader.cc | 72 +++++++++-------------------------- 1 file changed, 17 insertions(+), 55 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index c5e376b186..4b5686a5f4 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -594,38 +594,6 @@ public: return subdir.empty() ? _base_path : _base_path / subdir; } - distributed& db() noexcept { - return _db; - } - - const sstring& ks() const noexcept { - return _ks; - } - - const sstring& cf() const noexcept { - return _cf; - } - - global_column_family_ptr& global_table() noexcept { - return _global_table; - }; - - const global_column_family_ptr& global_table() const noexcept { - return _global_table; - }; - - const std::unordered_map>>& sstable_directories() const noexcept { - return _sstable_directories; - } - - sstables::sstable::version_types highest_version() const noexcept { - return _highest_version; - } - - sstables::generation_type highest_generation() const noexcept { - return _highest_generation; - } - using allow_offstrategy_compaction = bool_class; using must_exist = bool_class; future<> populate_column_family(sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); @@ -689,35 +657,29 @@ sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstab } future<> table_population_metadata::populate_column_family(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { - auto& metadata = *this; - auto& db = metadata.db(); - const auto& ks = metadata.ks(); - const auto& cf = metadata.cf(); - auto sstdir = metadata.get_path(subdir); - dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", ks, cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist); + auto sstdir = get_path(subdir); + dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", _ks, _cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist); assert(this_shard_id() == 0); if (!co_await file_exists(sstdir.native())) { if (dir_must_exist) { - throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", metadata.ks(), metadata.cf(), sstdir)); + throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", _ks, _cf, sstdir)); } co_return; } - auto& global_table = metadata.global_table(); - if (!metadata.sstable_directories().contains(subdir)) { - dblog.error("Could not find sstables directory {}.{}/{}", ks, cf, subdir); + if (!_sstable_directories.contains(subdir)) { + dblog.error("Could not find sstables directory {}.{}/{}", _ks, _cf, subdir); } - auto& directory = *metadata.sstable_directories().at(subdir); - auto sst_version = metadata.highest_version(); + auto& directory = *_sstable_directories.at(subdir); - co_await distributed_loader::reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable { - auto gen = smp::submit_to(shard, [&global_table] () { - return global_table->calculate_generation_for_new_table(); + co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, sstdir] (shard_id shard) mutable { + auto gen = smp::submit_to(shard, [this] () { + return _global_table->calculate_generation_for_new_table(); }).get0(); - return make_sstable(*global_table, sstdir, gen, sst_version); + return make_sstable(*_global_table, sstdir, gen, _highest_version); }, default_priority_class()); // The node is offline at this point so we are very lenient with what we consider @@ -732,18 +694,18 @@ future<> table_population_metadata::populate_column_family(sstring subdir, allow return sst->get_origin() != sstables::repair_origin; }; - co_await distributed_loader::reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) { - auto gen = global_table->calculate_generation_for_new_table(); - return make_sstable(*global_table, sstdir, gen, sst_version); + co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, sstdir] (shard_id shard) { + auto gen = _global_table->calculate_generation_for_new_table(); + return make_sstable(*_global_table, sstdir, gen, _highest_version); }, eligible_for_reshape_on_boot, default_priority_class()); - co_await directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> { - co_await dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) { + co_await directory.invoke_on_all([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> { + co_await dir.do_for_each_sstable([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) { auto requires_offstrategy = sstables::offstrategy(do_allow_offstrategy_compaction && !eligible_for_reshape_on_boot(sst)); - return global_table->add_sstable_and_update_cache(sst, requires_offstrategy); + return _global_table->add_sstable_and_update_cache(sst, requires_offstrategy); }); if (do_allow_offstrategy_compaction) { - global_table->trigger_offstrategy_compaction(); + _global_table->trigger_offstrategy_compaction(); } }); } From eb477a13adb0d255c8fc973d32b7597cbe26dd88 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 14 Feb 2023 15:38:50 +0300 Subject: [PATCH 3/5] distributed_loader: Move populate calls into metadata.start() This makes the metadata class export even shorter API, keeps the three sub-directories scanned in one place and allows removing the zero-shard assertion. Signed-off-by: Pavel Emelyanov --- replica/distributed_loader.cc | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 4b5686a5f4..fcce200cf4 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -582,6 +582,10 @@ public: _global_table->update_sstables_known_generation(_highest_generation); return _global_table->disable_auto_compaction(); }); + + co_await populate_subdir(sstables::staging_dir, allow_offstrategy_compaction::no); + co_await populate_subdir(sstables::quarantine_dir, allow_offstrategy_compaction::no, must_exist::no); + co_await populate_subdir("", allow_offstrategy_compaction::yes); } future<> stop() { @@ -590,15 +594,15 @@ public: } } +private: fs::path get_path(std::string_view subdir) { return subdir.empty() ? _base_path : _base_path / subdir; } using allow_offstrategy_compaction = bool_class; using must_exist = bool_class; - future<> populate_column_family(sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); + future<> populate_subdir(sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes); -private: future<> start_subdir(sstring subdir); }; @@ -656,12 +660,10 @@ sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstab return table.get_sstables_manager().make_sstable(table.schema(), dir.native(), generation, v, sstables::sstable_format_types::big); } -future<> table_population_metadata::populate_column_family(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { +future<> table_population_metadata::populate_subdir(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { auto sstdir = get_path(subdir); dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", _ks, _cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist); - assert(this_shard_id() == 0); - if (!co_await file_exists(sstdir.native())) { if (dir_must_exist) { throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", _ks, _cf, sstdir)); @@ -737,9 +739,6 @@ future<> distributed_loader::populate_keyspace(distributed& d co_await ks.make_directory_for_column_family(cfname, uuid); co_await metadata.start(); - co_await metadata.populate_column_family(sstables::staging_dir, table_population_metadata::allow_offstrategy_compaction::no); - co_await metadata.populate_column_family(sstables::quarantine_dir, table_population_metadata::allow_offstrategy_compaction::no, table_population_metadata::must_exist::no); - co_await metadata.populate_column_family("", table_population_metadata::allow_offstrategy_compaction::yes); } catch (...) { std::exception_ptr eptr = std::current_exception(); std::string msg = From 15926b22f42c84c9075595e77dfe32eaca19b92f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 14 Feb 2023 15:42:34 +0300 Subject: [PATCH 4/5] distributed_loader: Dont check for directory presense twice Both start_subdir() and populate_subdir() check for the directory to exist with explicit file_exists() check. That's excessive, if the directory wasn't there in the former call, the latter can get this by checking the _sstable_directories map. Signed-off-by: Pavel Emelyanov --- replica/distributed_loader.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index fcce200cf4..0414c98285 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -664,16 +664,13 @@ future<> table_population_metadata::populate_subdir(sstring subdir, allow_offstr auto sstdir = get_path(subdir); dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", _ks, _cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist); - if (!co_await file_exists(sstdir.native())) { + if (!_sstable_directories.contains(subdir)) { if (dir_must_exist) { throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", _ks, _cf, sstdir)); } co_return; } - if (!_sstable_directories.contains(subdir)) { - dblog.error("Could not find sstables directory {}.{}/{}", _ks, _cf, subdir); - } auto& directory = *_sstable_directories.at(subdir); co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, sstdir] (shard_id shard) mutable { From 0c7efe38e11f646233bdee791d5b29488cee687e Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 15 Feb 2023 20:14:18 +0300 Subject: [PATCH 5/5] distributed_loader: Rename table_population_metadata It used to be just metadata by providing the meta for population, now it does the population by itself, so rename it. Signed-off-by: Pavel Emelyanov --- replica/database.hh | 4 ++-- replica/distributed_loader.cc | 16 ++++++++-------- replica/distributed_loader.hh | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index 75fc94ac3c..bbca6d0975 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -274,7 +274,7 @@ using sstable_list = sstables::sstable_list; namespace replica { class distributed_loader; -struct table_population_metadata; +class table_populator; // The CF has a "stats" structure. But we don't want all fields here, // since some of them are fairly complex for exporting to collectd. Also, @@ -1094,7 +1094,7 @@ public: friend class ::column_family_test; friend class distributed_loader; - friend class table_population_metadata; + friend class table_populator; private: timer<> _off_strategy_trigger; diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 0414c98285..d4e9c64634 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -547,7 +547,7 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele co_await when_all_succeed(futures.begin(), futures.end()).discard_result(); } -class table_population_metadata { +class table_populator { distributed& _db; sstring _ks; sstring _cf; @@ -558,7 +558,7 @@ class table_population_metadata { sstables::generation_type _highest_generation = sstables::generation_from_value(0); public: - table_population_metadata(distributed& db, sstring ks, sstring cf) + table_populator(distributed& db, sstring ks, sstring cf) : _db(db) , _ks(std::move(ks)) , _cf(std::move(cf)) @@ -566,9 +566,9 @@ public: , _base_path(_global_table->dir()) {} - ~table_population_metadata() { + ~table_populator() { // All directories must have been stopped - // using table_population_metadata::stop() + // using table_populator::stop() assert(_sstable_directories.empty()); } @@ -606,7 +606,7 @@ private: future<> start_subdir(sstring subdir); }; -future<> table_population_metadata::start_subdir(sstring subdir) { +future<> table_populator::start_subdir(sstring subdir) { sstring sstdir = get_path(subdir).native(); if (!co_await file_exists(sstdir)) { co_return; @@ -631,7 +631,7 @@ future<> table_population_metadata::start_subdir(sstring subdir) { default_io_error_handler_gen() ); - // directory must be stopped using table_population_metadata::stop below + // directory must be stopped using table_populator::stop below _sstable_directories[subdir] = dptr; co_await distributed_loader::lock_table(directory, _db, _ks, _cf); @@ -660,7 +660,7 @@ sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstab return table.get_sstables_manager().make_sstable(table.schema(), dir.native(), generation, v, sstables::sstable_format_types::big); } -future<> table_population_metadata::populate_subdir(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { +future<> table_populator::populate_subdir(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) { auto sstdir = get_path(subdir); dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", _ks, _cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist); @@ -729,7 +729,7 @@ future<> distributed_loader::populate_keyspace(distributed& d auto sstdir = ks.column_family_directory(ksdir, cfname, uuid); dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version()); - auto metadata = table_population_metadata(db, ks_name, cfname); + auto metadata = table_populator(db, ks_name, cfname); std::exception_ptr ex; try { diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index d47b1d5646..8529cf2176 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -60,11 +60,11 @@ class distributed_loader_for_tests; namespace replica { -class table_population_metadata; +class table_populator; class distributed_loader { friend class ::distributed_loader_for_tests; - friend class table_population_metadata; + friend class table_populator; static future<> reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, std::function filter, io_priority_class iop);