Merge 'Move population code into table_population_metadata' from Pavel Emelyanov

There's the distribtued_loader::populate_column_family() helper that manages sstables on their way towards table on boot. The method naturally belongs the the table_population_metadata -- a helper class that in fact prepares the ground for the method in question.

This PR moves the method into metadata class and removes whole lot of extra alias-references and private-fields exporting methods from it. Also it keeps start_subdir and populate_c._f. logic close to each other and relaxes several excessive checks from them.

Closes #12847

* github.com:scylladb/scylladb:
  distributed_loader: Rename table_population_metadata
  distributed_loader: Dont check for directory presense twice
  distributed_loader: Move populate calls into metadata.start()
  distributed_loader: Remove local aliases and exporters
  distributed_loader: Move populate_column_family() into population meta
This commit is contained in:
Avi Kivity
2023-02-15 22:55:48 +02:00
3 changed files with 36 additions and 76 deletions

View File

@@ -274,7 +274,7 @@ using sstable_list = sstables::sstable_list;
namespace replica {
class distributed_loader;
struct table_population_metadata;
class table_populator;
// The CF has a "stats" structure. But we don't want all fields here,
// since some of them are fairly complex for exporting to collectd. Also,
@@ -1094,7 +1094,7 @@ public:
friend class ::column_family_test;
friend class distributed_loader;
friend class table_population_metadata;
friend class table_populator;
private:
timer<> _off_strategy_trigger;

View File

@@ -547,7 +547,7 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
}
class table_population_metadata {
class table_populator {
distributed<replica::database>& _db;
sstring _ks;
sstring _cf;
@@ -558,7 +558,7 @@ class table_population_metadata {
sstables::generation_type _highest_generation = sstables::generation_from_value(0);
public:
table_population_metadata(distributed<replica::database>& db, sstring ks, sstring cf)
table_populator(distributed<replica::database>& db, sstring ks, sstring cf)
: _db(db)
, _ks(std::move(ks))
, _cf(std::move(cf))
@@ -566,9 +566,9 @@ public:
, _base_path(_global_table->dir())
{}
~table_population_metadata() {
~table_populator() {
// All directories must have been stopped
// using table_population_metadata::stop()
// using table_populator::stop()
assert(_sstable_directories.empty());
}
@@ -582,6 +582,10 @@ public:
_global_table->update_sstables_known_generation(_highest_generation);
return _global_table->disable_auto_compaction();
});
co_await populate_subdir(sstables::staging_dir, allow_offstrategy_compaction::no);
co_await populate_subdir(sstables::quarantine_dir, allow_offstrategy_compaction::no, must_exist::no);
co_await populate_subdir("", allow_offstrategy_compaction::yes);
}
future<> stop() {
@@ -590,47 +594,19 @@ public:
}
}
private:
fs::path get_path(std::string_view subdir) {
return subdir.empty() ? _base_path : _base_path / subdir;
}
distributed<replica::database>& db() noexcept {
return _db;
}
using allow_offstrategy_compaction = bool_class<struct allow_offstrategy_compaction_tag>;
using must_exist = bool_class<struct must_exist_tag>;
future<> populate_subdir(sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes);
const sstring& ks() const noexcept {
return _ks;
}
const sstring& cf() const noexcept {
return _cf;
}
global_column_family_ptr& global_table() noexcept {
return _global_table;
};
const global_column_family_ptr& global_table() const noexcept {
return _global_table;
};
const std::unordered_map<sstring, lw_shared_ptr<sharded<sstables::sstable_directory>>>& sstable_directories() const noexcept {
return _sstable_directories;
}
sstables::sstable::version_types highest_version() const noexcept {
return _highest_version;
}
sstables::generation_type highest_generation() const noexcept {
return _highest_generation;
}
private:
future<> start_subdir(sstring subdir);
};
future<> table_population_metadata::start_subdir(sstring subdir) {
future<> table_populator::start_subdir(sstring subdir) {
sstring sstdir = get_path(subdir).native();
if (!co_await file_exists(sstdir)) {
co_return;
@@ -655,7 +631,7 @@ future<> table_population_metadata::start_subdir(sstring subdir) {
default_io_error_handler_gen()
);
// directory must be stopped using table_population_metadata::stop below
// directory must be stopped using table_populator::stop below
_sstable_directories[subdir] = dptr;
co_await distributed_loader::lock_table(directory, _db, _ks, _cf);
@@ -684,35 +660,25 @@ sstables::shared_sstable make_sstable(replica::table& table, fs::path dir, sstab
return table.get_sstables_manager().make_sstable(table.schema(), dir.native(), generation, v, sstables::sstable_format_types::big);
}
future<> distributed_loader::populate_column_family(table_population_metadata& metadata, sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) {
auto& db = metadata.db();
const auto& ks = metadata.ks();
const auto& cf = metadata.cf();
auto sstdir = metadata.get_path(subdir);
dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", ks, cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist);
future<> table_populator::populate_subdir(sstring subdir, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) {
auto sstdir = get_path(subdir);
dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", _ks, _cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist);
assert(this_shard_id() == 0);
if (!co_await file_exists(sstdir.native())) {
if (!_sstable_directories.contains(subdir)) {
if (dir_must_exist) {
throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", metadata.ks(), metadata.cf(), sstdir));
throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", _ks, _cf, sstdir));
}
co_return;
}
auto& global_table = metadata.global_table();
if (!metadata.sstable_directories().contains(subdir)) {
dblog.error("Could not find sstables directory {}.{}/{}", ks, cf, subdir);
}
auto& directory = *metadata.sstable_directories().at(subdir);
auto sst_version = metadata.highest_version();
auto& directory = *_sstable_directories.at(subdir);
co_await reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable {
auto gen = smp::submit_to(shard, [&global_table] () {
return global_table->calculate_generation_for_new_table();
co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, sstdir] (shard_id shard) mutable {
auto gen = smp::submit_to(shard, [this] () {
return _global_table->calculate_generation_for_new_table();
}).get0();
return make_sstable(*global_table, sstdir, gen, sst_version);
return make_sstable(*_global_table, sstdir, gen, _highest_version);
}, default_priority_class());
// The node is offline at this point so we are very lenient with what we consider
@@ -727,18 +693,18 @@ future<> distributed_loader::populate_column_family(table_population_metadata& m
return sst->get_origin() != sstables::repair_origin;
};
co_await reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) {
auto gen = global_table->calculate_generation_for_new_table();
return make_sstable(*global_table, sstdir, gen, sst_version);
co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, sstdir] (shard_id shard) {
auto gen = _global_table->calculate_generation_for_new_table();
return make_sstable(*_global_table, sstdir, gen, _highest_version);
}, eligible_for_reshape_on_boot, default_priority_class());
co_await directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> {
co_await dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) {
co_await directory.invoke_on_all([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> {
co_await dir.do_for_each_sstable([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) {
auto requires_offstrategy = sstables::offstrategy(do_allow_offstrategy_compaction && !eligible_for_reshape_on_boot(sst));
return global_table->add_sstable_and_update_cache(sst, requires_offstrategy);
return _global_table->add_sstable_and_update_cache(sst, requires_offstrategy);
});
if (do_allow_offstrategy_compaction) {
global_table->trigger_offstrategy_compaction();
_global_table->trigger_offstrategy_compaction();
}
});
}
@@ -763,16 +729,13 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version());
auto metadata = table_population_metadata(db, ks_name, cfname);
auto metadata = table_populator(db, ks_name, cfname);
std::exception_ptr ex;
try {
co_await ks.make_directory_for_column_family(cfname, uuid);
co_await metadata.start();
co_await distributed_loader::populate_column_family(metadata, sstables::staging_dir, allow_offstrategy_compaction::no);
co_await distributed_loader::populate_column_family(metadata, sstables::quarantine_dir, allow_offstrategy_compaction::no, must_exist::no);
co_await distributed_loader::populate_column_family(metadata, "", allow_offstrategy_compaction::yes);
} catch (...) {
std::exception_ptr eptr = std::current_exception();
std::string msg =

View File

@@ -60,11 +60,11 @@ class distributed_loader_for_tests;
namespace replica {
class table_population_metadata;
class table_populator;
class distributed_loader {
friend class ::distributed_loader_for_tests;
friend class table_population_metadata;
friend class table_populator;
static future<> reshape(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstables::reshape_mode mode,
sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, std::function<bool (const sstables::shared_sstable&)> filter, io_priority_class iop);
@@ -75,9 +75,6 @@ class distributed_loader {
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
std::filesystem::path datadir, sstring ks, sstring cf);
using allow_offstrategy_compaction = bool_class<struct allow_offstrategy_compaction_tag>;
using must_exist = bool_class<struct must_exist_tag>;
static future<> populate_column_family(table_population_metadata& metadata, sstring subdir, allow_offstrategy_compaction, must_exist = must_exist::yes);
static future<> populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name);
static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir);
static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);