load_new_sstables: reshard before scanning the upload directory

In a later patch we will be able move files directly from upload
into the main directory. However for now, for the benefit of doing
this incrementally, we will first reshard in place with our new
reshard infrastructure.

load_new_sstables can then move the SSTables directly, without having
to worry about resharding. This has the immediate benefit that the
resharding happens:

- in the streaming group, without affecting compaction work
- without waiting for the current locks (which are held by compactions)
  in load_new_sstables to release.

We could, at this point, just move the SSTables to the main directory
right away.

I am not doing this in this patch, and opting to keep the rest of upload
process unchanged. This will be fixed later when we enable offstrategy
compactions: we'll then compact the SSTables generated into the main
directory.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2020-04-13 21:25:06 -04:00
parent aebd965f0e
commit 8021d12371
3 changed files with 55 additions and 4 deletions

View File

@@ -380,6 +380,58 @@ distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<d
});
}
future<>
distributed_loader::process_upload_dir(distributed<database>& db, sstring ks, sstring cf) {
seastar::thread_attributes attr;
attr.sched_group = db.local().get_streaming_scheduling_group();
return seastar::async(std::move(attr), [&db, ks = std::move(ks), cf = std::move(cf)] {
global_column_family_ptr global_table(db, ks, cf);
sharded<sstables::sstable_directory> directory;
auto upload = fs::path(global_table->dir()) / "upload";
directory.start(upload, 4,
sstables::sstable_directory::need_mutate_level::yes,
sstables::sstable_directory::lack_of_toc_fatal::no,
sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()),
sstables::sstable_directory::allow_loading_materialized_view::no,
[&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return global_table->make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&directory] {
directory.stop().get();
});
lock_table(directory, db, ks, cf).get();
process_sstable_dir(directory).get();
auto highest_generation_seen = directory.map_reduce0(
std::mem_fn(&sstables::sstable_directory::highest_generation_seen),
int64_t(0),
[] (int64_t a, int64_t b) { return std::max(a, b); }
).get0();
auto shard_generation_base = highest_generation_seen / smp::count + 1;
// We still want to do our best to keep the generation numbers shard-friendly.
// Each destination shard will manage its own generation counter.
std::vector<std::atomic<int64_t>> shard_gen(smp::count);
for (shard_id s = 0; s < smp::count; ++s) {
shard_gen[s].store(shard_generation_base * smp::count + s, std::memory_order_relaxed);
}
reshard(directory, db, ks, cf, [&global_table, upload, &shard_gen] (shard_id shard) mutable {
// we need generation calculated by instance of cf at requested shard
auto gen = shard_gen[shard].fetch_add(smp::count, std::memory_order_relaxed);
return global_table->make_sstable(upload.native(), gen,
global_table->get_sstables_manager().get_highest_supported_format(),
sstables::sstable::format_types::big);
}).get();
});
}
// This function will iterate through upload directory in column family,
// and will do the following for each sstable found:
// 1) Mutate sstable level to 0.
@@ -688,10 +740,6 @@ future<> distributed_loader::load_new_sstables(distributed<database>& db, distri
cf._sstables_opened_but_not_loaded.clear();
cf.trigger_compaction();
});
}).then([&db, ks, cf] () mutable {
return smp::submit_to(0, [&db, ks = std::move(ks), cf = std::move(cf)] () mutable {
distributed_loader::reshard(db, std::move(ks), std::move(cf));
});
});
}).handle_exception([&db, ks, cf] (std::exception_ptr ep) {
return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) {

View File

@@ -70,6 +70,7 @@ public:
static future<> load_new_sstables(distributed<database>& db, distributed<db::view::view_update_generator>& view_update_generator,
sstring ks, sstring cf, std::vector<sstables::entry_descriptor> new_tables);
static future<std::vector<sstables::entry_descriptor>> flush_upload_dir(distributed<database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks, sstring ks_name, sstring cf_name);
static future<> process_upload_dir(distributed<database>& db, sstring ks_name, sstring cf_name);
static future<sstables::entry_descriptor> probe_file(distributed<database>& db, sstring sstdir, sstring fname);
static future<> populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf);
static future<> populate_keyspace(distributed<database>& db, sstring datadir, sstring ks_name);

View File

@@ -2897,6 +2897,7 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
slogger.info("Loading new SSTables for {}.{}...", ks_name, cf_name);
return distributed_loader::process_upload_dir(_db, ks_name, cf_name).then([this, ks_name, cf_name] {
// First, we need to stop SSTable creation for that CF in all shards. This is a really horrible
// thing to do, because under normal circumnstances this can make dirty memory go up to the point
// of explosion.
@@ -2988,6 +2989,7 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
return distributed_loader::load_new_sstables(_db, _view_update_generator, ks_name, cf_name, std::move(new_tables)).then([ks_name, cf_name] {
slogger.info("Done loading new SSTables for {}.{} for all shards", ks_name, cf_name);
});
});
}).finally([this] {
_loading_new_sstables = false;
});