sstables: sstable_directory: Load SSTable at the shard that actually own it

Today, the SSTable generation provides a hint on which shard owns a
particular SSTable. That hint determines which shard will load the
SSTable into memory.

With upcoming UUID generation, we will no longer have this hint
embedded into the SSTable generation, meaning that SSTables will be
loaded at random shards. This is not good because shards will have
to reference memory from other shards to access the SSTable
metadata that was allocated elsewhere.

This patch changes sstable_directory to:
1) Use generation value to only determine which shard will calculate
the owner shards for SSTables. Essentially works like a round-robin
distribution.
2) The shard assigned to compute the owners for a SSTable will do
so reading the minimum from disk, usually only Scylla file is
needed.
3) Once that shard finished computing the owners, it will forward
the SSTable to the shard that own it.
4) Shards will later load SSTables locally that were forwarded to
them.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2023-03-08 21:04:43 -03:00
parent 2c4e141314
commit 0c77f77659
2 changed files with 32 additions and 19 deletions

View File

@@ -140,23 +140,32 @@ sstable_directory::process_descriptor(sstables::entry_descriptor desc, process_f
}
}
future<std::vector<shard_id>> sstable_directory::get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const {
auto sst = _manager.make_sstable(_schema, _sstable_dir.native(), desc.generation, desc.version, desc.format, gc_clock::now(), _error_handler_gen);
co_await sst->load_owner_shards(_io_priority);
validate(sst, flags);
co_return sst->get_shards_for_this_sstable();
}
future<foreign_sstable_open_info> sstable_directory::get_open_info_for_this_sstable(const sstables::entry_descriptor& desc) const {
auto sst = co_await load_sstable(std::move(desc));
co_return co_await sst->get_open_info();
}
future<>
sstable_directory::sort_sstable(sstables::entry_descriptor desc, process_flags flags) {
auto sst = co_await load_sstable(desc, flags);
sstables::foreign_sstable_open_info info = co_await sst->get_open_info();
auto shards = sst->get_shards_for_this_sstable();
auto shards = co_await get_shards_for_this_sstable(desc, flags);
if (shards.size() == 1) {
if (shards[0] == this_shard_id()) {
dirlog.trace("{} identified as a local unshared SSTable", sst->get_filename());
_unshared_local_sstables.push_back(sst);
dirlog.trace("{} identified as a local unshared SSTable", sstable_filename(desc));
_unshared_local_sstables.push_back(co_await load_sstable(std::move(desc), flags));
} else {
dirlog.trace("{} identified as a remote unshared SSTable", sst->get_filename());
_unshared_remote_sstables[shards[0]].push_back(std::move(info));
dirlog.trace("{} identified as a remote unshared SSTable", sstable_filename(desc));
_unshared_remote_sstables[shards[0]].push_back(std::move(desc));
}
} else {
dirlog.trace("{} identified as a shared SSTable", sst->get_filename());
_shared_sstable_info.push_back(std::move(info));
dirlog.trace("{} identified as a shared SSTable", sstable_filename(desc));
_shared_sstable_info.push_back(co_await get_open_info_for_this_sstable(desc));
}
}
@@ -290,9 +299,9 @@ future<shared_sstable> sstable_directory::load_foreign_sstable(foreign_sstable_o
}
future<>
sstable_directory::load_foreign_sstables(sstable_open_info_vector info_vec) {
return parallel_for_each_restricted(info_vec, [this] (sstables::foreign_sstable_open_info& info) {
return load_foreign_sstable(info).then([this] (auto sst) {
sstable_directory::load_foreign_sstables(sstable_entry_descriptor_vector info_vec) {
return parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) {
return load_sstable(info).then([this] (auto sst) {
_unshared_local_sstables.push_back(sst);
return make_ready_future<>();
});
@@ -327,10 +336,8 @@ sstable_directory::collect_output_unshared_sstables(std::vector<sstables::shared
}
dirlog.trace("Collected output SSTable {} is remote. Storing it", sst->get_filename());
return sst->get_open_info().then([this, shard, sst] (sstables::foreign_sstable_open_info info) {
_unshared_remote_sstables[shard].push_back(std::move(info));
return make_ready_future<>();
});
_unshared_remote_sstables[shard].push_back(sstables::entry_descriptor::make_descriptor(_sstable_dir.native(), sst->component_basename(component_type::Data)));
return make_ready_future<>();
});
}

View File

@@ -54,6 +54,7 @@ public:
// favor chunked vectors when dealing with file lists: they can grow to hundreds of thousands
// of elements.
using sstable_open_info_vector = utils::chunked_vector<sstables::foreign_sstable_open_info>;
using sstable_entry_descriptor_vector = utils::chunked_vector<sstables::entry_descriptor>;
// Flags below control how to behave when scanning new SSTables.
struct process_flags {
@@ -122,7 +123,7 @@ private:
//
// The indexes of the outer vector represent the shards. Having anything in the index
// representing this shard is illegal.
std::vector<sstable_open_info_vector> _unshared_remote_sstables;
std::vector<sstable_entry_descriptor_vector> _unshared_remote_sstables;
// SSTables that are shared. Stored as foreign_sstable_open_info objects. Reason is those are
// the SSTables that we found, and not necessarily the ones we will reshard. We want to balance
@@ -139,13 +140,18 @@ private:
template <typename Container, typename Func>
requires std::is_invocable_r_v<future<>, Func, typename std::decay_t<Container>::value_type&>
future<> parallel_for_each_restricted(Container&& C, Func&& func);
future<> load_foreign_sstables(sstable_open_info_vector info_vec);
future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec);
// Sort the sstable according to owner
future<> sort_sstable(sstables::entry_descriptor desc, process_flags flags);
// Returns filename for a SSTable from its entry_descriptor.
sstring sstable_filename(const sstables::entry_descriptor& desc) const;
// Compute owner of shards for a particular SSTable.
future<std::vector<shard_id>> get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const;
// Retrieves sstables::foreign_sstable_open_info for a particular SSTable.
future<foreign_sstable_open_info> get_open_info_for_this_sstable(const sstables::entry_descriptor& desc) const;
public:
sstable_directory(sstables_manager& manager,
schema_ptr schema,