diff --git a/distributed_loader.cc b/distributed_loader.cc index 1813b947b9..96f7330338 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -380,6 +380,58 @@ distributed_loader::reshard(sharded& dir, sharded +distributed_loader::process_upload_dir(distributed& db, sstring ks, sstring cf) { + seastar::thread_attributes attr; + attr.sched_group = db.local().get_streaming_scheduling_group(); + + return seastar::async(std::move(attr), [&db, ks = std::move(ks), cf = std::move(cf)] { + global_column_family_ptr global_table(db, ks, cf); + + sharded directory; + auto upload = fs::path(global_table->dir()) / "upload"; + directory.start(upload, 4, + sstables::sstable_directory::need_mutate_level::yes, + sstables::sstable_directory::lack_of_toc_fatal::no, + sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()), + sstables::sstable_directory::allow_loading_materialized_view::no, + [&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { + return global_table->make_sstable(dir.native(), gen, v, f); + + }).get(); + + auto stop = defer([&directory] { + directory.stop().get(); + }); + + lock_table(directory, db, ks, cf).get(); + process_sstable_dir(directory).get(); + auto highest_generation_seen = directory.map_reduce0( + std::mem_fn(&sstables::sstable_directory::highest_generation_seen), + int64_t(0), + [] (int64_t a, int64_t b) { return std::max(a, b); } + ).get0(); + + auto shard_generation_base = highest_generation_seen / smp::count + 1; + + // We still want to do our best to keep the generation numbers shard-friendly. + // Each destination shard will manage its own generation counter. + std::vector> shard_gen(smp::count); + for (shard_id s = 0; s < smp::count; ++s) { + shard_gen[s].store(shard_generation_base * smp::count + s, std::memory_order_relaxed); + } + + reshard(directory, db, ks, cf, [&global_table, upload, &shard_gen] (shard_id shard) mutable { + // we need generation calculated by instance of cf at requested shard + auto gen = shard_gen[shard].fetch_add(smp::count, std::memory_order_relaxed); + + return global_table->make_sstable(upload.native(), gen, + global_table->get_sstables_manager().get_highest_supported_format(), + sstables::sstable::format_types::big); + }).get(); + }); +} + // This function will iterate through upload directory in column family, // and will do the following for each sstable found: // 1) Mutate sstable level to 0. @@ -688,10 +740,6 @@ future<> distributed_loader::load_new_sstables(distributed& db, distri cf._sstables_opened_but_not_loaded.clear(); cf.trigger_compaction(); }); - }).then([&db, ks, cf] () mutable { - return smp::submit_to(0, [&db, ks = std::move(ks), cf = std::move(cf)] () mutable { - distributed_loader::reshard(db, std::move(ks), std::move(cf)); - }); }); }).handle_exception([&db, ks, cf] (std::exception_ptr ep) { return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) { diff --git a/distributed_loader.hh b/distributed_loader.hh index 687605dc54..c292995364 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -70,6 +70,7 @@ public: static future<> load_new_sstables(distributed& db, distributed& view_update_generator, sstring ks, sstring cf, std::vector new_tables); static future> flush_upload_dir(distributed& db, distributed& sys_dist_ks, sstring ks_name, sstring cf_name); + static future<> process_upload_dir(distributed& db, sstring ks_name, sstring cf_name); static future probe_file(distributed& db, sstring sstdir, sstring fname); static future<> populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); diff --git a/service/storage_service.cc b/service/storage_service.cc index 8fcd167b05..3698b6c4be 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2897,6 +2897,7 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { slogger.info("Loading new SSTables for {}.{}...", ks_name, cf_name); + return distributed_loader::process_upload_dir(_db, ks_name, cf_name).then([this, ks_name, cf_name] { // First, we need to stop SSTable creation for that CF in all shards. This is a really horrible // thing to do, because under normal circumnstances this can make dirty memory go up to the point // of explosion. @@ -2988,6 +2989,7 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { return distributed_loader::load_new_sstables(_db, _view_update_generator, ks_name, cf_name, std::move(new_tables)).then([ks_name, cf_name] { slogger.info("Done loading new SSTables for {}.{} for all shards", ks_name, cf_name); }); + }); }).finally([this] { _loading_new_sstables = false; });