distributed_loader: remove unused code

Remove code no longer used by population procedure.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2020-06-25 14:28:06 -03:00
parent 60467a7e36
commit 6dfeb107ae
2 changed files with 0 additions and 105 deletions

View File

@@ -461,107 +461,6 @@ distributed_loader::process_upload_dir(distributed<database>& db, distributed<db
});
}
future<> distributed_loader::open_sstable(distributed<database>& db, sstables::entry_descriptor comps,
std::function<future<> (column_family&, sstables::foreign_sstable_open_info)> func, const io_priority_class& pc) {
// loads components of a sstable from shard S and share it with all other
// shards. Which shard a sstable will be opened at is decided using
// calculate_shard_from_sstable_generation(), which is the inverse of
// calculate_generation_for_new_table(). That ensures every sstable is
// shard-local if reshard wasn't performed. This approach is also expected
// to distribute evenly the resource usage among all shards.
static thread_local std::unordered_map<sstring, named_semaphore> named_semaphores;
return db.invoke_on(column_family::calculate_shard_from_sstable_generation(comps.generation),
[&db, comps = std::move(comps), func = std::move(func), &pc] (database& local) {
// check if we should bypass concurrency throttle for this keyspace
// we still only allow a single sstable per shard extra to be loaded,
// to avoid concurrency explosion
auto& sem = load_prio_keyspaces.count(comps.ks)
? named_semaphores.try_emplace(comps.ks, 1, named_semaphore_exception_factory{comps.ks}).first->second
: local.sstable_load_concurrency_sem()
;
return with_semaphore(sem, 1, [&db, &local, comps = std::move(comps), func = std::move(func), &pc] {
auto& cf = local.find_column_family(comps.ks, comps.cf);
auto sst = cf.make_sstable(comps.sstdir, comps.generation, comps.version, comps.format);
auto f = sst->load(pc).then([sst = std::move(sst)] {
return sst->load_shared_components();
});
return f.then([&db, comps = std::move(comps), func = std::move(func)] (sstables::sstable_open_info info) {
// shared components loaded, now opening sstable in all shards that own it with shared components
return do_with(std::move(info), [&db, comps = std::move(comps), func = std::move(func)] (auto& info) {
// All shards that own the sstable is interested in it in addition to shard that
// is responsible for its generation. We may need to add manually this shard
// because sstable may not contain data that belong to it.
auto shards_interested_in_this_sstable = boost::copy_range<std::unordered_set<shard_id>>(info.owners);
shard_id shard_responsible_for_generation = column_family::calculate_shard_from_sstable_generation(comps.generation);
shards_interested_in_this_sstable.insert(shard_responsible_for_generation);
return invoke_shards_with_ptr(std::move(shards_interested_in_this_sstable), db, std::move(info.components),
[owners = info.owners, data = info.data.dup(), index = info.index.dup(), comps, func] (database& db, auto components) {
auto& cf = db.find_column_family(comps.ks, comps.cf);
return func(cf, sstables::foreign_sstable_open_info{std::move(components), owners, data, index});
});
});
});
});
});
}
future<sstables::entry_descriptor> distributed_loader::probe_file(distributed<database>& db, sstring sstdir, sstring fname) {
using namespace sstables;
entry_descriptor comps = entry_descriptor::make_descriptor(sstdir, fname);
// Every table will have a TOC. Using a specific file as a criteria, as
// opposed to, say verifying _sstables.count() to be zero is more robust
// against parallel loading of the directory contents.
if (comps.component != component_type::TOC) {
return make_ready_future<entry_descriptor>(std::move(comps));
}
auto cf_sstable_open = [sstdir, comps, fname] (column_family& cf, sstables::foreign_sstable_open_info info) {
cf.update_sstables_known_generation(comps.generation);
if (shared_sstable sst = cf.get_staging_sstable(comps.generation)) {
dblog.warn("SSTable {} is already present in staging/ directory. Moving from staging will be retried.", sst->get_filename());
return sst->move_to_new_dir(comps.sstdir, comps.generation);
}
{
auto i = boost::range::find_if(*cf._sstables->all(), [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; });
if (i != cf._sstables->all()->end()) {
auto new_toc = sstdir + "/" + fname;
throw std::runtime_error(format("Attempted to add sstable generation {:d} twice: new={} existing={}",
comps.generation, new_toc, (*i)->toc_filename()));
}
}
return cf.open_sstable(std::move(info), sstdir, comps.generation, comps.version, comps.format).then([&cf] (sstables::shared_sstable sst) mutable {
if (sst) {
return cf.get_row_cache().invalidate([&cf, sst = std::move(sst)] () mutable noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
cf.load_sstable(sst);
});
}
return make_ready_future<>();
});
};
return distributed_loader::open_sstable(db, comps, cf_sstable_open).then_wrapped([fname] (future<> f) {
try {
f.get();
} catch (malformed_sstable_exception& e) {
dblog.error("malformed sstable {}: {}. Refusing to boot", fname, e.what());
throw;
} catch(...) {
dblog.error("Unrecognized error while processing {}: {}. Refusing to boot",
fname, std::current_exception());
throw;
}
return make_ready_future<>();
}).then([comps] () mutable {
return make_ready_future<entry_descriptor>(std::move(comps));
});
}
static future<> execute_futures(std::vector<future<>>& futures) {
return seastar::when_all(futures.begin(), futures.end()).then([] (std::vector<future<>> ret) {
std::exception_ptr eptr;

View File

@@ -65,9 +65,6 @@ public:
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir);
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring cf_name);
static future<> open_sstable(distributed<database>& db, sstables::entry_descriptor comps,
std::function<future<> (column_family&, sstables::foreign_sstable_open_info)> func,
const io_priority_class& pc = default_priority_class());
static future<> verify_owner_and_mode(std::filesystem::path path);
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
@@ -75,7 +72,6 @@ public:
std::filesystem::path datadir, sstring ks, sstring cf);
static future<> process_upload_dir(distributed<database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
distributed<db::view::view_update_generator>& view_update_generator, sstring ks_name, sstring cf_name);
static future<sstables::entry_descriptor> probe_file(distributed<database>& db, sstring sstdir, sstring fname);
static future<> populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf);
static future<> populate_keyspace(distributed<database>& db, sstring datadir, sstring ks_name);
static future<> init_system_keyspace(distributed<database>& db);