distributed_loader: remove unused code
Remove code no longer used by population procedure. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -461,107 +461,6 @@ distributed_loader::process_upload_dir(distributed<database>& db, distributed<db
|
||||
});
|
||||
}
|
||||
|
||||
future<> distributed_loader::open_sstable(distributed<database>& db, sstables::entry_descriptor comps,
|
||||
std::function<future<> (column_family&, sstables::foreign_sstable_open_info)> func, const io_priority_class& pc) {
|
||||
// loads components of a sstable from shard S and share it with all other
|
||||
// shards. Which shard a sstable will be opened at is decided using
|
||||
// calculate_shard_from_sstable_generation(), which is the inverse of
|
||||
// calculate_generation_for_new_table(). That ensures every sstable is
|
||||
// shard-local if reshard wasn't performed. This approach is also expected
|
||||
// to distribute evenly the resource usage among all shards.
|
||||
static thread_local std::unordered_map<sstring, named_semaphore> named_semaphores;
|
||||
|
||||
return db.invoke_on(column_family::calculate_shard_from_sstable_generation(comps.generation),
|
||||
[&db, comps = std::move(comps), func = std::move(func), &pc] (database& local) {
|
||||
|
||||
// check if we should bypass concurrency throttle for this keyspace
|
||||
// we still only allow a single sstable per shard extra to be loaded,
|
||||
// to avoid concurrency explosion
|
||||
auto& sem = load_prio_keyspaces.count(comps.ks)
|
||||
? named_semaphores.try_emplace(comps.ks, 1, named_semaphore_exception_factory{comps.ks}).first->second
|
||||
: local.sstable_load_concurrency_sem()
|
||||
;
|
||||
|
||||
return with_semaphore(sem, 1, [&db, &local, comps = std::move(comps), func = std::move(func), &pc] {
|
||||
auto& cf = local.find_column_family(comps.ks, comps.cf);
|
||||
auto sst = cf.make_sstable(comps.sstdir, comps.generation, comps.version, comps.format);
|
||||
auto f = sst->load(pc).then([sst = std::move(sst)] {
|
||||
return sst->load_shared_components();
|
||||
});
|
||||
return f.then([&db, comps = std::move(comps), func = std::move(func)] (sstables::sstable_open_info info) {
|
||||
// shared components loaded, now opening sstable in all shards that own it with shared components
|
||||
return do_with(std::move(info), [&db, comps = std::move(comps), func = std::move(func)] (auto& info) {
|
||||
// All shards that own the sstable is interested in it in addition to shard that
|
||||
// is responsible for its generation. We may need to add manually this shard
|
||||
// because sstable may not contain data that belong to it.
|
||||
auto shards_interested_in_this_sstable = boost::copy_range<std::unordered_set<shard_id>>(info.owners);
|
||||
shard_id shard_responsible_for_generation = column_family::calculate_shard_from_sstable_generation(comps.generation);
|
||||
shards_interested_in_this_sstable.insert(shard_responsible_for_generation);
|
||||
|
||||
return invoke_shards_with_ptr(std::move(shards_interested_in_this_sstable), db, std::move(info.components),
|
||||
[owners = info.owners, data = info.data.dup(), index = info.index.dup(), comps, func] (database& db, auto components) {
|
||||
auto& cf = db.find_column_family(comps.ks, comps.cf);
|
||||
return func(cf, sstables::foreign_sstable_open_info{std::move(components), owners, data, index});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<sstables::entry_descriptor> distributed_loader::probe_file(distributed<database>& db, sstring sstdir, sstring fname) {
|
||||
using namespace sstables;
|
||||
|
||||
entry_descriptor comps = entry_descriptor::make_descriptor(sstdir, fname);
|
||||
|
||||
// Every table will have a TOC. Using a specific file as a criteria, as
|
||||
// opposed to, say verifying _sstables.count() to be zero is more robust
|
||||
// against parallel loading of the directory contents.
|
||||
if (comps.component != component_type::TOC) {
|
||||
return make_ready_future<entry_descriptor>(std::move(comps));
|
||||
}
|
||||
auto cf_sstable_open = [sstdir, comps, fname] (column_family& cf, sstables::foreign_sstable_open_info info) {
|
||||
cf.update_sstables_known_generation(comps.generation);
|
||||
if (shared_sstable sst = cf.get_staging_sstable(comps.generation)) {
|
||||
dblog.warn("SSTable {} is already present in staging/ directory. Moving from staging will be retried.", sst->get_filename());
|
||||
return sst->move_to_new_dir(comps.sstdir, comps.generation);
|
||||
}
|
||||
{
|
||||
auto i = boost::range::find_if(*cf._sstables->all(), [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; });
|
||||
if (i != cf._sstables->all()->end()) {
|
||||
auto new_toc = sstdir + "/" + fname;
|
||||
throw std::runtime_error(format("Attempted to add sstable generation {:d} twice: new={} existing={}",
|
||||
comps.generation, new_toc, (*i)->toc_filename()));
|
||||
}
|
||||
}
|
||||
return cf.open_sstable(std::move(info), sstdir, comps.generation, comps.version, comps.format).then([&cf] (sstables::shared_sstable sst) mutable {
|
||||
if (sst) {
|
||||
return cf.get_row_cache().invalidate([&cf, sst = std::move(sst)] () mutable noexcept {
|
||||
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
|
||||
cf.load_sstable(sst);
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
};
|
||||
|
||||
return distributed_loader::open_sstable(db, comps, cf_sstable_open).then_wrapped([fname] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (malformed_sstable_exception& e) {
|
||||
dblog.error("malformed sstable {}: {}. Refusing to boot", fname, e.what());
|
||||
throw;
|
||||
} catch(...) {
|
||||
dblog.error("Unrecognized error while processing {}: {}. Refusing to boot",
|
||||
fname, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).then([comps] () mutable {
|
||||
return make_ready_future<entry_descriptor>(std::move(comps));
|
||||
});
|
||||
}
|
||||
|
||||
static future<> execute_futures(std::vector<future<>>& futures) {
|
||||
return seastar::when_all(futures.begin(), futures.end()).then([] (std::vector<future<>> ret) {
|
||||
std::exception_ptr eptr;
|
||||
|
||||
@@ -65,9 +65,6 @@ public:
|
||||
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir);
|
||||
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring cf_name);
|
||||
|
||||
static future<> open_sstable(distributed<database>& db, sstables::entry_descriptor comps,
|
||||
std::function<future<> (column_family&, sstables::foreign_sstable_open_info)> func,
|
||||
const io_priority_class& pc = default_priority_class());
|
||||
static future<> verify_owner_and_mode(std::filesystem::path path);
|
||||
|
||||
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
|
||||
@@ -75,7 +72,6 @@ public:
|
||||
std::filesystem::path datadir, sstring ks, sstring cf);
|
||||
static future<> process_upload_dir(distributed<database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
distributed<db::view::view_update_generator>& view_update_generator, sstring ks_name, sstring cf_name);
|
||||
static future<sstables::entry_descriptor> probe_file(distributed<database>& db, sstring sstdir, sstring fname);
|
||||
static future<> populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf);
|
||||
static future<> populate_keyspace(distributed<database>& db, sstring datadir, sstring ks_name);
|
||||
static future<> init_system_keyspace(distributed<database>& db);
|
||||
|
||||
Reference in New Issue
Block a user