sstables: sstable_directory: Load SSTable at the shard that actually own it
Today, the SSTable generation provides a hint on which shard owns a particular SSTable. That hint determines which shard will load the SSTable into memory. With upcoming UUID generation, we will no longer have this hint embedded into the SSTable generation, meaning that SSTables will be loaded at random shards. This is not good because shards will have to reference memory from other shards to access the SSTable metadata that was allocated elsewhere. This patch changes sstable_directory to: 1) Use generation value to only determine which shard will calculate the owner shards for SSTables. Essentially works like a round-robin distribution. 2) The shard assigned to compute the owners for a SSTable will do so reading the minimum from disk, usually only Scylla file is needed. 3) Once that shard finished computing the owners, it will forward the SSTable to the shard that own it. 4) Shards will later load SSTables locally that were forwarded to them. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -140,23 +140,32 @@ sstable_directory::process_descriptor(sstables::entry_descriptor desc, process_f
|
||||
}
|
||||
}
|
||||
|
||||
future<std::vector<shard_id>> sstable_directory::get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const {
|
||||
auto sst = _manager.make_sstable(_schema, _sstable_dir.native(), desc.generation, desc.version, desc.format, gc_clock::now(), _error_handler_gen);
|
||||
co_await sst->load_owner_shards(_io_priority);
|
||||
validate(sst, flags);
|
||||
co_return sst->get_shards_for_this_sstable();
|
||||
}
|
||||
|
||||
future<foreign_sstable_open_info> sstable_directory::get_open_info_for_this_sstable(const sstables::entry_descriptor& desc) const {
|
||||
auto sst = co_await load_sstable(std::move(desc));
|
||||
co_return co_await sst->get_open_info();
|
||||
}
|
||||
|
||||
future<>
|
||||
sstable_directory::sort_sstable(sstables::entry_descriptor desc, process_flags flags) {
|
||||
auto sst = co_await load_sstable(desc, flags);
|
||||
sstables::foreign_sstable_open_info info = co_await sst->get_open_info();
|
||||
|
||||
auto shards = sst->get_shards_for_this_sstable();
|
||||
auto shards = co_await get_shards_for_this_sstable(desc, flags);
|
||||
if (shards.size() == 1) {
|
||||
if (shards[0] == this_shard_id()) {
|
||||
dirlog.trace("{} identified as a local unshared SSTable", sst->get_filename());
|
||||
_unshared_local_sstables.push_back(sst);
|
||||
dirlog.trace("{} identified as a local unshared SSTable", sstable_filename(desc));
|
||||
_unshared_local_sstables.push_back(co_await load_sstable(std::move(desc), flags));
|
||||
} else {
|
||||
dirlog.trace("{} identified as a remote unshared SSTable", sst->get_filename());
|
||||
_unshared_remote_sstables[shards[0]].push_back(std::move(info));
|
||||
dirlog.trace("{} identified as a remote unshared SSTable", sstable_filename(desc));
|
||||
_unshared_remote_sstables[shards[0]].push_back(std::move(desc));
|
||||
}
|
||||
} else {
|
||||
dirlog.trace("{} identified as a shared SSTable", sst->get_filename());
|
||||
_shared_sstable_info.push_back(std::move(info));
|
||||
dirlog.trace("{} identified as a shared SSTable", sstable_filename(desc));
|
||||
_shared_sstable_info.push_back(co_await get_open_info_for_this_sstable(desc));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,9 +299,9 @@ future<shared_sstable> sstable_directory::load_foreign_sstable(foreign_sstable_o
|
||||
}
|
||||
|
||||
future<>
|
||||
sstable_directory::load_foreign_sstables(sstable_open_info_vector info_vec) {
|
||||
return parallel_for_each_restricted(info_vec, [this] (sstables::foreign_sstable_open_info& info) {
|
||||
return load_foreign_sstable(info).then([this] (auto sst) {
|
||||
sstable_directory::load_foreign_sstables(sstable_entry_descriptor_vector info_vec) {
|
||||
return parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) {
|
||||
return load_sstable(info).then([this] (auto sst) {
|
||||
_unshared_local_sstables.push_back(sst);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -327,10 +336,8 @@ sstable_directory::collect_output_unshared_sstables(std::vector<sstables::shared
|
||||
}
|
||||
|
||||
dirlog.trace("Collected output SSTable {} is remote. Storing it", sst->get_filename());
|
||||
return sst->get_open_info().then([this, shard, sst] (sstables::foreign_sstable_open_info info) {
|
||||
_unshared_remote_sstables[shard].push_back(std::move(info));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
_unshared_remote_sstables[shard].push_back(sstables::entry_descriptor::make_descriptor(_sstable_dir.native(), sst->component_basename(component_type::Data)));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ public:
|
||||
// favor chunked vectors when dealing with file lists: they can grow to hundreds of thousands
|
||||
// of elements.
|
||||
using sstable_open_info_vector = utils::chunked_vector<sstables::foreign_sstable_open_info>;
|
||||
using sstable_entry_descriptor_vector = utils::chunked_vector<sstables::entry_descriptor>;
|
||||
|
||||
// Flags below control how to behave when scanning new SSTables.
|
||||
struct process_flags {
|
||||
@@ -122,7 +123,7 @@ private:
|
||||
//
|
||||
// The indexes of the outer vector represent the shards. Having anything in the index
|
||||
// representing this shard is illegal.
|
||||
std::vector<sstable_open_info_vector> _unshared_remote_sstables;
|
||||
std::vector<sstable_entry_descriptor_vector> _unshared_remote_sstables;
|
||||
|
||||
// SSTables that are shared. Stored as foreign_sstable_open_info objects. Reason is those are
|
||||
// the SSTables that we found, and not necessarily the ones we will reshard. We want to balance
|
||||
@@ -139,13 +140,18 @@ private:
|
||||
template <typename Container, typename Func>
|
||||
requires std::is_invocable_r_v<future<>, Func, typename std::decay_t<Container>::value_type&>
|
||||
future<> parallel_for_each_restricted(Container&& C, Func&& func);
|
||||
future<> load_foreign_sstables(sstable_open_info_vector info_vec);
|
||||
future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec);
|
||||
|
||||
// Sort the sstable according to owner
|
||||
future<> sort_sstable(sstables::entry_descriptor desc, process_flags flags);
|
||||
|
||||
// Returns filename for a SSTable from its entry_descriptor.
|
||||
sstring sstable_filename(const sstables::entry_descriptor& desc) const;
|
||||
|
||||
// Compute owner of shards for a particular SSTable.
|
||||
future<std::vector<shard_id>> get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const;
|
||||
// Retrieves sstables::foreign_sstable_open_info for a particular SSTable.
|
||||
future<foreign_sstable_open_info> get_open_info_for_this_sstable(const sstables::entry_descriptor& desc) const;
|
||||
public:
|
||||
sstable_directory(sstables_manager& manager,
|
||||
schema_ptr schema,
|
||||
|
||||
Reference in New Issue
Block a user