From 8021d1237113aaa00232035d1da8a40ca167d546 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 13 Apr 2020 21:25:06 -0400 Subject: [PATCH] load_new_sstables: reshard before scanning the upload directory In a later patch we will be able move files directly from upload into the main directory. However for now, for the benefit of doing this incrementally, we will first reshard in place with our new reshard infrastructure. load_new_sstables can then move the SSTables directly, without having to worry about resharding. This has the immediate benefit that the resharding happens: - in the streaming group, without affecting compaction work - without waiting for the current locks (which are held by compactions) in load_new_sstables to release. We could, at this point, just move the SSTables to the main directory right away. I am not doing this in this patch, and opting to keep the rest of upload process unchanged. This will be fixed later when we enable offstrategy compactions: we'll then compact the SSTables generated into the main directory. Signed-off-by: Glauber Costa --- distributed_loader.cc | 56 +++++++++++++++++++++++++++++++++++--- distributed_loader.hh | 1 + service/storage_service.cc | 2 ++ 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/distributed_loader.cc b/distributed_loader.cc index 1813b947b9..96f7330338 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -380,6 +380,58 @@ distributed_loader::reshard(sharded& dir, sharded +distributed_loader::process_upload_dir(distributed& db, sstring ks, sstring cf) { + seastar::thread_attributes attr; + attr.sched_group = db.local().get_streaming_scheduling_group(); + + return seastar::async(std::move(attr), [&db, ks = std::move(ks), cf = std::move(cf)] { + global_column_family_ptr global_table(db, ks, cf); + + sharded directory; + auto upload = fs::path(global_table->dir()) / "upload"; + directory.start(upload, 4, + sstables::sstable_directory::need_mutate_level::yes, + sstables::sstable_directory::lack_of_toc_fatal::no, + sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()), + sstables::sstable_directory::allow_loading_materialized_view::no, + [&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { + return global_table->make_sstable(dir.native(), gen, v, f); + + }).get(); + + auto stop = defer([&directory] { + directory.stop().get(); + }); + + lock_table(directory, db, ks, cf).get(); + process_sstable_dir(directory).get(); + auto highest_generation_seen = directory.map_reduce0( + std::mem_fn(&sstables::sstable_directory::highest_generation_seen), + int64_t(0), + [] (int64_t a, int64_t b) { return std::max(a, b); } + ).get0(); + + auto shard_generation_base = highest_generation_seen / smp::count + 1; + + // We still want to do our best to keep the generation numbers shard-friendly. + // Each destination shard will manage its own generation counter. + std::vector> shard_gen(smp::count); + for (shard_id s = 0; s < smp::count; ++s) { + shard_gen[s].store(shard_generation_base * smp::count + s, std::memory_order_relaxed); + } + + reshard(directory, db, ks, cf, [&global_table, upload, &shard_gen] (shard_id shard) mutable { + // we need generation calculated by instance of cf at requested shard + auto gen = shard_gen[shard].fetch_add(smp::count, std::memory_order_relaxed); + + return global_table->make_sstable(upload.native(), gen, + global_table->get_sstables_manager().get_highest_supported_format(), + sstables::sstable::format_types::big); + }).get(); + }); +} + // This function will iterate through upload directory in column family, // and will do the following for each sstable found: // 1) Mutate sstable level to 0. @@ -688,10 +740,6 @@ future<> distributed_loader::load_new_sstables(distributed& db, distri cf._sstables_opened_but_not_loaded.clear(); cf.trigger_compaction(); }); - }).then([&db, ks, cf] () mutable { - return smp::submit_to(0, [&db, ks = std::move(ks), cf = std::move(cf)] () mutable { - distributed_loader::reshard(db, std::move(ks), std::move(cf)); - }); }); }).handle_exception([&db, ks, cf] (std::exception_ptr ep) { return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) { diff --git a/distributed_loader.hh b/distributed_loader.hh index 687605dc54..c292995364 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -70,6 +70,7 @@ public: static future<> load_new_sstables(distributed& db, distributed& view_update_generator, sstring ks, sstring cf, std::vector new_tables); static future> flush_upload_dir(distributed& db, distributed& sys_dist_ks, sstring ks_name, sstring cf_name); + static future<> process_upload_dir(distributed& db, sstring ks_name, sstring cf_name); static future probe_file(distributed& db, sstring sstdir, sstring fname); static future<> populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); diff --git a/service/storage_service.cc b/service/storage_service.cc index 8fcd167b05..3698b6c4be 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2897,6 +2897,7 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { slogger.info("Loading new SSTables for {}.{}...", ks_name, cf_name); + return distributed_loader::process_upload_dir(_db, ks_name, cf_name).then([this, ks_name, cf_name] { // First, we need to stop SSTable creation for that CF in all shards. This is a really horrible // thing to do, because under normal circumnstances this can make dirty memory go up to the point // of explosion. @@ -2988,6 +2989,7 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { return distributed_loader::load_new_sstables(_db, _view_update_generator, ks_name, cf_name, std::move(new_tables)).then([ks_name, cf_name] { slogger.info("Done loading new SSTables for {}.{} for all shards", ks_name, cf_name); }); + }); }).finally([this] { _loading_new_sstables = false; });