Merge 'Relax sstable_directory::process_descriptor() call graph' from Pavel Emelyanov

The method logic is clean and simple -- load sstable from the descriptor and sort it into one of collections (local, shared, remote, unsorted). To achieve that there's a bunch of helper methods, but they duplicate functionality of each other. Squashing most of this code into process_descriptor() makes it easier to read and keeps sstable_directory private API much shorter.

Closes scylladb/scylladb#20126

* github.com:scylladb/scylladb:
  sstable_directory: Open-code load_sstable() into process_descriptor()
  sstable_directory: Squash sort_sstable() with process_descriptor()
  sstable_directory: Remove unused sstable_filename(desc) helper
  sstable_directory: Log sst->get_filename(), not sstable_filename(desc)
  sstable_directory: Keep loaded sst in local var
  sstable_directory: Remove unused helpers
  sstable_directory: Load sstable once when sorting
This commit is contained in:
Avi Kivity
2024-08-13 16:42:52 +03:00
2 changed files with 17 additions and 51 deletions

View File

@@ -173,63 +173,41 @@ future<sstables::shared_sstable> sstable_directory::load_sstable(sstables::entry
co_return sst;
}
future<sstables::shared_sstable> sstable_directory::load_sstable(sstables::entry_descriptor desc, process_flags flags) const {
auto sst = co_await load_sstable(std::move(desc), flags.sstable_open_config);
validate(sst, flags);
if (flags.need_mutate_level) {
dirlog.trace("Mutating {} to level 0\n", sst->get_filename());
co_await sst->mutate_sstable_level(0);
}
co_return sst;
}
future<>
sstable_directory::process_descriptor(sstables::entry_descriptor desc, process_flags flags) {
if (desc.version > _max_version_seen) {
_max_version_seen = desc.version;
}
if (flags.sort_sstables_according_to_owner) {
co_await sort_sstable(std::move(desc), flags);
} else {
dirlog.debug("Added {} to unsorted sstables list", sstable_filename(desc));
_unsorted_sstables.push_back(co_await load_sstable(std::move(desc), flags));
}
}
future<std::vector<shard_id>> sstable_directory::get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const {
auto sst = _manager.make_sstable(_schema, _table_dir, *_storage_opts, desc.generation, _state, desc.version, desc.format, gc_clock::now(), _error_handler_gen);
co_await sst->load_owner_shards(_sharder);
auto sst = co_await load_sstable(desc, flags.sstable_open_config);
validate(sst, flags);
co_return sst->get_shards_for_this_sstable();
}
future<foreign_sstable_open_info> sstable_directory::get_open_info_for_this_sstable(const sstables::entry_descriptor& desc) const {
auto sst = co_await load_sstable(std::move(desc));
co_return co_await sst->get_open_info();
}
if (flags.need_mutate_level) {
dirlog.trace("Mutating {} to level 0\n", sst->get_filename());
co_await sst->mutate_sstable_level(0);
}
future<>
sstable_directory::sort_sstable(sstables::entry_descriptor desc, process_flags flags) {
auto shards = co_await get_shards_for_this_sstable(desc, flags);
if (!flags.sort_sstables_according_to_owner) {
dirlog.debug("Added {} to unsorted sstables list", sst->get_filename());
_unsorted_sstables.push_back(std::move(sst));
co_return;
}
auto shards = sst->get_shards_for_this_sstable();
if (shards.size() == 1) {
if (shards[0] == this_shard_id()) {
dirlog.trace("{} identified as a local unshared SSTable", sstable_filename(desc));
_unshared_local_sstables.push_back(co_await load_sstable(std::move(desc), flags));
dirlog.trace("{} identified as a local unshared SSTable", sst->get_filename());
_unshared_local_sstables.push_back(std::move(sst));
} else {
dirlog.trace("{} identified as a remote unshared SSTable, shard={}", sstable_filename(desc), shards[0]);
dirlog.trace("{} identified as a remote unshared SSTable, shard={}", sst->get_filename(), shards[0]);
_unshared_remote_sstables[shards[0]].push_back(std::move(desc));
}
} else {
dirlog.trace("{} identified as a shared SSTable, shards={}", sstable_filename(desc), shards);
_shared_sstable_info.push_back(co_await get_open_info_for_this_sstable(desc));
dirlog.trace("{} identified as a shared SSTable, shards={}", sst->get_filename(), shards);
_shared_sstable_info.push_back(co_await sst->get_open_info());
}
}
sstring sstable_directory::sstable_filename(const sstables::entry_descriptor& desc) const {
return sstable::filename(make_path(_table_dir, _state).native(), _schema->ks_name(), _schema->cf_name(), desc.version, desc.generation, desc.format, component_type::Data);
}
generation_type
sstable_directory::highest_generation_seen() const {
return _max_generation_seen;

View File

@@ -177,21 +177,9 @@ private:
future<> process_descriptor(sstables::entry_descriptor desc, process_flags flags);
void validate(sstables::shared_sstable sst, process_flags flags) const;
future<sstables::shared_sstable> load_sstable(sstables::entry_descriptor desc, sstables::sstable_open_config cfg = {}) const;
future<sstables::shared_sstable> load_sstable(sstables::entry_descriptor desc, process_flags flags) const;
future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec);
// Sort the sstable according to owner
future<> sort_sstable(sstables::entry_descriptor desc, process_flags flags);
// Returns filename for a SSTable from its entry_descriptor.
sstring sstable_filename(const sstables::entry_descriptor& desc) const;
// Compute owner of shards for a particular SSTable.
future<std::vector<shard_id>> get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const;
// Retrieves sstables::foreign_sstable_open_info for a particular SSTable.
future<foreign_sstable_open_info> get_open_info_for_this_sstable(const sstables::entry_descriptor& desc) const;
sstable_directory(sstables_manager& manager,
schema_ptr schema,
std::variant<std::unique_ptr<dht::sharder>, const dht::sharder*> sharder,