diff --git a/api/storage_service.cc b/api/storage_service.cc index 209de1842c..454d563302 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -87,20 +87,13 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) { }; } -future<> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector tables, bool enabled) { +future set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector tables, bool enabled) { if (tables.empty()) { tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); } - return ctx.db.invoke_on_all([keyspace, tables, enabled] (database& db) { - return parallel_for_each(tables, [&db, keyspace, enabled](const sstring& table) mutable { - column_family& cf = db.find_column_family(keyspace, table); - if (enabled) { - cf.enable_auto_compaction(); - } else { - cf.disable_auto_compaction(); - } - return make_ready_future<>(); - }); + + return service::get_local_storage_service().set_tables_autocompaction(keyspace, tables, enabled).then([]{ + return make_ready_future(json_void()); }); } @@ -742,17 +735,15 @@ void set_storage_service(http_context& ctx, routes& r) { ss::enable_auto_compaction.set(r, [&ctx](std::unique_ptr req) { auto keyspace = validate_keyspace(ctx, req->param); auto tables = split_cf(req->get_query_param("cf")); - return set_tables_autocompaction(ctx, keyspace, tables, true).then([]{ - return make_ready_future(json_void()); - }); + + return set_tables_autocompaction(ctx, keyspace, tables, true); }); ss::disable_auto_compaction.set(r, [&ctx](std::unique_ptr req) { auto keyspace = validate_keyspace(ctx, req->param); auto tables = split_cf(req->get_query_param("cf")); - return set_tables_autocompaction(ctx, keyspace, tables, false).then([]{ - return make_ready_future(json_void()); - }); + + return set_tables_autocompaction(ctx, keyspace, tables, false); }); ss::deliver_hints.set(r, [](std::unique_ptr req) { diff --git a/compaction_strategy.hh b/compaction_strategy.hh index 0cb49910e1..77833b2fac 100644 --- a/compaction_strategy.hh +++ b/compaction_strategy.hh @@ -23,6 +23,7 @@ #include #include +#include #include "schema_fwd.hh" #include "sstables/shared_sstable.hh" @@ -62,8 +63,6 @@ public: compaction_descriptor get_major_compaction_job(column_family& cf, std::vector candidates); - std::vector get_resharding_jobs(column_family& cf, std::vector candidates); - // Some strategies may look at the compacted and resulting sstables to // get some useful information for subsequent compactions. void notify_completion(const std::vector& removed, const std::vector& added); @@ -135,6 +134,20 @@ public: // Returns whether or not interposer consumer is used by a given strategy. bool use_interposer_consumer() const; + + // Informs the caller (usually the compaction manager) about what would it take for this set of + // SSTables closer to becoming in-strategy. If this returns an empty compaction descriptor, this + // means that the sstable set is already in-strategy. + // + // The caller can specify one of two modes: strict or relaxed. In relaxed mode the tolerance for + // what is considered offstrategy is higher. It can be used, for instance, for when the system + // is restarting and previous compactions were likely in-flight. In strict mode, we are less + // tolerant to invariant breakages. + // + // The caller should also pass a maximum number of SSTables which is the maximum amount of + // SSTables that can be added into a single job. + compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode); + }; // Creates a compaction_strategy object from one of the strategies available. diff --git a/compaction_strategy_type.hh b/compaction_strategy_type.hh index ec5728dbc5..65322ff723 100644 --- a/compaction_strategy_type.hh +++ b/compaction_strategy_type.hh @@ -32,4 +32,5 @@ enum class compaction_strategy_type { time_window, }; +enum class reshape_mode { strict, relaxed }; } diff --git a/database.hh b/database.hh index db560a8ae6..eabe780525 100644 --- a/database.hh +++ b/database.hh @@ -489,11 +489,7 @@ private: // This semaphore ensures that an operation like snapshot won't have its selected // sstables deleted by compaction in parallel, a race condition which could // easily result in failure. - // Locking order: must be acquired either independently or after _sstables_lock seastar::named_semaphore _sstable_deletion_sem = {1, named_semaphore_exception_factory{"sstable deletion"}}; - // There are situations in which we need to stop writing sstables. Flushers will take - // the read lock, and the ones that wish to stop that process will take the write lock. - rwlock _sstables_lock; mutable row_cache _cache; // Cache covers only sstables. std::optional _sstable_generation = {}; @@ -820,16 +816,6 @@ public: future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); - // Important warning: disabling writes will only have an effect in the current shard. - // The other shards will keep writing tables at will. Therefore, you very likely need - // to call this separately in all shards first, to guarantee that none of them are writing - // new data before you can safely assume that the whole node is disabled. - future disable_sstable_write(); - - // SSTable writes are now allowed again, and generation is updated to new_generation if != -1 - // returns the amount of microseconds elapsed since we disabled writes. - std::chrono::steady_clock::duration enable_sstable_write(int64_t new_generation); - // Make sure the generation numbers are sequential, starting from "start". // Generations before "start" are left untouched. // diff --git a/db/config.cc b/db/config.cc index 5f1096e957..4914000355 100644 --- a/db/config.cc +++ b/db/config.cc @@ -724,6 +724,8 @@ db::config::config(std::shared_ptr exts) "especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.") , max_memory_for_unlimited_query(this, "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, size_t(1) << 20, "Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries.") + , initial_sstable_loading_concurrency(this, "initial_sstable_loading_concurrency", value_status::Used, 4u, + "Maximum amount of sstables to load in parallel during initialization. A higher number can lead to more memory consumption. You should not need to touch this") , enable_3_1_0_compatibility_mode(this, "enable_3_1_0_compatibility_mode", value_status::Used, false, "Set to true if the cluster was initially installed from 3.1.0. If it was upgraded from an earlier version," " or installed from a later version, leave this set to false. This adjusts the communication protocol to" diff --git a/db/config.hh b/db/config.hh index c726e34b10..e8867a0340 100644 --- a/db/config.hh +++ b/db/config.hh @@ -304,6 +304,7 @@ public: named_value max_partition_key_restrictions_per_query; named_value max_clustering_key_restrictions_per_query; named_value max_memory_for_unlimited_query; + named_value initial_sstable_loading_concurrency; named_value enable_3_1_0_compatibility_mode; named_value enable_user_defined_functions; named_value user_defined_function_time_limit_ms; diff --git a/distributed_loader.cc b/distributed_loader.cc index d6ea2fc78e..73870b6dd1 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -79,6 +79,10 @@ static io_error_handler error_handler_for_upload_dir() { }; } +io_error_handler error_handler_gen_for_upload_dir(disk_error_signal_type& dummy) { + return error_handler_for_upload_dir(); +} + // TODO: possibly move it to seastar template static future<> invoke_shards_with_ptr(std::unordered_set shards, distributed& s, PtrType ptr, Func&& func) { @@ -109,87 +113,6 @@ public: } }; -// checks whether or not a given column family is worth resharding by checking if any of its -// sstables has more than one owner shard. -static future worth_resharding(distributed& db, global_column_family_ptr cf) { - auto has_shared_sstables = [cf] (database& db) { - return cf->has_shared_sstables(); - }; - return db.map_reduce0(has_shared_sstables, bool(false), std::logical_or()); -} - -static future> -load_sstables_with_open_info(std::vector ssts_info, global_column_family_ptr cf, sstring dir, - noncopyable_function pred) { - return do_with(std::vector(), [ssts_info = std::move(ssts_info), cf, dir, pred = std::move(pred)] (auto& ssts) mutable { - return parallel_for_each(std::move(ssts_info), [&ssts, cf, dir, pred = std::move(pred)] (auto& info) mutable { - if (!pred(info)) { - return make_ready_future<>(); - } - auto sst = cf->make_sstable(dir, info.generation, info.version, info.format); - return sst->load(std::move(info)).then([&ssts, sst] { - ssts.push_back(std::move(sst)); - return make_ready_future<>(); - }); - }).then([&ssts] () mutable { - return std::move(ssts); - }); - }); -} - -// make a set of sstables available at another shard. -static future<> forward_sstables_to(shard_id shard, sstring directory, std::vector sstables, global_column_family_ptr cf, - std::function (std::vector)> func) { - return seastar::async([sstables = std::move(sstables), directory, shard, cf, func = std::move(func)] () mutable { - auto infos = boost::copy_range>(sstables - | boost::adaptors::transformed([] (auto&& sst) { return sst->get_open_info().get0(); })); - - smp::submit_to(shard, [cf, func, infos = std::move(infos), directory] () mutable { - return load_sstables_with_open_info(std::move(infos), cf, directory, [] (auto& p) { - return true; - }).then([func] (std::vector sstables) { - return func(std::move(sstables)); - }); - }).get(); - }); -} - -// Return all sstables that need resharding in the system. Only one instance of a shared sstable is returned. -static future> get_all_shared_sstables(distributed& db, sstring sstdir, global_column_family_ptr cf) { - class all_shared_sstables { - sstring _dir; - global_column_family_ptr _cf; - std::unordered_map _result; - public: - all_shared_sstables(const sstring& sstdir, global_column_family_ptr cf) : _dir(sstdir), _cf(cf) {} - - future<> operator()(std::vector ssts_info) { - return load_sstables_with_open_info(std::move(ssts_info), _cf, _dir, [this] (auto& info) { - // skip loading of shared sstable that is already stored in _result. - return !_result.count(info.generation); - }).then([this] (std::vector sstables) { - for (auto& sst : sstables) { - auto gen = sst->generation(); - _result.emplace(gen, std::move(sst)); - } - return make_ready_future<>(); - }); - } - - std::vector get() && { - return boost::copy_range>(std::move(_result) | boost::adaptors::map_values); - } - }; - - return db.map_reduce(all_shared_sstables(sstdir, cf), [cf, sstdir] (database& db) mutable { - return seastar::async([cf, sstdir] { - return boost::copy_range>(cf->sstables_need_rewrite() - | boost::adaptors::filtered([sstdir] (auto&& sst) { return sst->get_dir() == sstdir; }) - | boost::adaptors::transformed([] (auto&& sst) { return sst->get_open_info().get0(); })); - }); - }); -} - template static inline future<> verification_error(fs::path path, const char* fstr, Args&&... args) { @@ -343,9 +266,8 @@ future<> run_resharding_jobs(sharded& dir, std::vec } return do_with(std::move(reshard_jobs), [&dir, &db, ks_name, table_name, creator = std::move(creator), total_size] (std::vector& reshard_jobs) { - auto total_size_mb = total_size / 1000000.0; auto start = std::chrono::steady_clock::now(); - dblog.info("{}", fmt::format("Resharding {:.2f} MB", total_size_mb)); + dblog.info("{}", fmt::format("Resharding {} ", sstables::pretty_printed_data_size(total_size))); return dir.invoke_on_all([&dir, &db, &reshard_jobs, ks_name, table_name, creator] (sstables::sstable_directory& d) mutable { auto& table = db.local().find_column_family(ks_name, table_name); @@ -356,11 +278,9 @@ future<> run_resharding_jobs(sharded& dir, std::vec return d.reshard(std::move(info_vec), cm, table, max_threshold, creator, iop).then([&d, &dir] { return d.move_foreign_sstables(dir); }); - }).then([start, total_size_mb] { - // add a microsecond to prevent division by zero - auto now = std::chrono::steady_clock::now() + 1us; - auto seconds = std::chrono::duration_cast>(now - start).count(); - dblog.info("{}", fmt::format("Resharded {:.2f} MB in {:.2f} seconds, {:.2f} MB/s", total_size_mb, seconds, (total_size_mb / seconds))); + }).then([start, total_size] { + auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); + dblog.info("{}", fmt::format("Resharded {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration))); return make_ready_future<>(); }); }); @@ -380,12 +300,99 @@ distributed_loader::reshard(sharded& dir, sharded +highest_generation_seen(sharded& directory) { + return directory.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_generation_seen), int64_t(0), [] (int64_t a, int64_t b) { + return std::max(a, b); + }); +} + +future +highest_version_seen(sharded& dir, sstables::sstable_version_types system_version) { + using version = sstables::sstable_version_types; + return dir.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_version_seen), system_version, [] (version a, version b) { + return sstables::is_later(a, b) ? a : b; + }); +} + future<> -distributed_loader::process_upload_dir(distributed& db, sstring ks, sstring cf) { +distributed_loader::reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, + sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) { + + auto start = std::chrono::steady_clock::now(); + return dir.map_reduce0([&dir, &db, ks_name = std::move(ks_name), table_name = std::move(table_name), creator = std::move(creator), mode] (sstables::sstable_directory& d) { + auto& table = db.local().find_column_family(ks_name, table_name); + auto& cm = table.get_compaction_manager(); + auto& iop = service::get_local_streaming_read_priority(); + return d.reshape(cm, table, creator, iop, mode); + }, uint64_t(0), std::plus()).then([start] (uint64_t total_size) { + if (total_size > 0) { + auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); + dblog.info("{}", fmt::format("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration))); + } + return make_ready_future<>(); + }); +} + +// Loads SSTables into the main directory (or staging) and returns how many were loaded +future +distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded& db, + sharded& view_update_generator, fs::path datadir, sstring ks, sstring cf) { + + auto& table = db.local().find_column_family(ks, cf); + + return do_with(dht::ring_position::max(), dht::ring_position::min(), [&table, &dir, &view_update_generator, datadir = std::move(datadir)] (dht::ring_position& min, dht::ring_position& max) { + return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &min, &max] (sstables::shared_sstable sst) { + min = std::min(dht::ring_position(sst->get_first_decorated_key()), min, dht::ring_position_less_comparator(*table.schema())); + max = std::max(dht::ring_position(sst->get_last_decorated_key()) , max, dht::ring_position_less_comparator(*table.schema())); + + auto gen = table.calculate_generation_for_new_table(); + dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen); + return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, sst] { + table._sstables_opened_but_not_loaded.push_back(std::move(sst)); + return make_ready_future<>(); + }); + }).then([&table, &min, &max] { + // nothing loaded + if (min.is_max() && max.is_min()) { + return make_ready_future<>(); + } + + return table.get_row_cache().invalidate([&table] () noexcept { + for (auto& sst : table._sstables_opened_but_not_loaded) { + try { + table.load_sstable(sst, true); + } catch (...) { + dblog.error("Failed to load {}: {}. Aborting.", sst->toc_filename(), std::current_exception()); + abort(); + } + } + }, dht::partition_range::make({min, true}, {max, true})); + }).then([&view_update_generator, &table] { + return parallel_for_each(table._sstables_opened_but_not_loaded, [&view_update_generator, &table] (sstables::shared_sstable& sst) { + if (sst->requires_view_building()) { + return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this()); + } + return make_ready_future<>(); + }); + }).then_wrapped([&table] (future<> f) { + auto opened = std::exchange(table._sstables_opened_but_not_loaded, {}); + if (!f.failed()) { + return make_ready_future(opened.size()); + } else { + return make_exception_future(f.get_exception()); + } + }); + }); +} + +future<> +distributed_loader::process_upload_dir(distributed& db, distributed& sys_dist_ks, + distributed& view_update_generator, sstring ks, sstring cf) { seastar::thread_attributes attr; attr.sched_group = db.local().get_streaming_scheduling_group(); - return seastar::async(std::move(attr), [&db, ks = std::move(ks), cf = std::move(cf)] { + return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] { global_column_family_ptr global_table(db, ks, cf); sharded directory; @@ -396,7 +403,7 @@ distributed_loader::process_upload_dir(distributed& db, sstring ks, ss sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()), sstables::sstable_directory::allow_loading_materialized_view::no, [&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { - return global_table->make_sstable(dir.native(), gen, v, f); + return global_table->make_sstable(dir.native(), gen, v, f, &error_handler_gen_for_upload_dir); }).get(); @@ -406,13 +413,9 @@ distributed_loader::process_upload_dir(distributed& db, sstring ks, ss lock_table(directory, db, ks, cf).get(); process_sstable_dir(directory).get(); - auto highest_generation_seen = directory.map_reduce0( - std::mem_fn(&sstables::sstable_directory::highest_generation_seen), - int64_t(0), - [] (int64_t a, int64_t b) { return std::max(a, b); } - ).get0(); - auto shard_generation_base = highest_generation_seen / smp::count + 1; + auto generation = highest_generation_seen(directory).get0(); + auto shard_generation_base = generation / smp::count + 1; // We still want to do our best to keep the generation numbers shard-friendly. // Each destination shard will manage its own generation counter. @@ -427,80 +430,30 @@ distributed_loader::process_upload_dir(distributed& db, sstring ks, ss return global_table->make_sstable(upload.native(), gen, global_table->get_sstables_manager().get_highest_supported_format(), - sstables::sstable::format_types::big); + sstables::sstable::format_types::big, &error_handler_gen_for_upload_dir); }).get(); - }); -} -// This function will iterate through upload directory in column family, -// and will do the following for each sstable found: -// 1) Mutate sstable level to 0. -// 2) Check if view updates need to be generated from this sstable. If so, leave it intact for now. -// 3) Otherwise, create hard links to its components in column family dir. -// 4) Remove all of its components in upload directory. -// At the end, it's expected that upload dir contains only staging sstables -// which need to wait until view updates are generated from them. -// -// Return a vector containing descriptor of sstables to be loaded. -future> -distributed_loader::flush_upload_dir(distributed& db, distributed& sys_dist_ks, sstring ks_name, sstring cf_name) { - return seastar::async([&db, &sys_dist_ks, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] { - std::unordered_map descriptors; - std::vector flushed; + reshape(directory, db, sstables::reshape_mode::strict, ks, cf, [global_table, upload, &shard_gen] (shard_id shard) { + auto gen = shard_gen[shard].fetch_add(smp::count, std::memory_order_relaxed); + return global_table->make_sstable(upload.native(), gen, + global_table->get_sstables_manager().get_highest_supported_format(), + sstables::sstable::format_types::big, + &error_handler_gen_for_upload_dir); + }).get(); - auto& cf = db.local().find_column_family(ks_name, cf_name); - auto upload_dir = fs::path(cf._config.datadir) / "upload"; - verify_owner_and_mode(upload_dir).get(); - lister::scan_dir(upload_dir, { directory_entry_type::regular }, [&descriptors] (fs::path parent_dir, directory_entry de) { - auto comps = sstables::entry_descriptor::make_descriptor(parent_dir.native(), de.name); - if (comps.component != component_type::TOC) { - return make_ready_future<>(); - } - descriptors.emplace(comps.generation, std::move(comps)); - return make_ready_future<>(); - }, &sstables::manifest_json_filter).get(); + const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), *global_table, streaming::stream_reason::repair).get0(); - flushed.reserve(descriptors.size()); - for (auto& [generation, comps] : descriptors) { - auto descriptors = db.invoke_on(column_family::calculate_shard_from_sstable_generation(generation), [&sys_dist_ks, ks_name, cf_name, comps] (database& db) { - return seastar::async([&db, &sys_dist_ks, ks_name = std::move(ks_name), cf_name = std::move(cf_name), comps = std::move(comps)] () mutable { - auto& cf = db.find_column_family(ks_name, cf_name); - auto sst = cf.make_sstable(cf._config.datadir + "/upload", comps.generation, comps.version, comps.format, - [] (disk_error_signal_type&) { return error_handler_for_upload_dir(); }); - auto gen = cf.calculate_generation_for_new_table(); - - sst->read_toc().get(); - schema_ptr s = cf.schema(); - if (s->is_counter() && !sst->has_scylla_component()) { - sstring error = "Direct loading non-Scylla SSTables containing counters is not supported."; - if (db.get_config().enable_dangerous_direct_import_of_cassandra_counters()) { - dblog.info("{} But trying to continue on user's request.", error); - } else { - dblog.error("{} Use sstableloader instead.", error); - throw std::runtime_error(fmt::format("{} Use sstableloader instead.", error)); - } - } - if (s->is_view()) { - throw std::runtime_error("Loading Materialized View SSTables is not supported. Re-create the view instead."); - } - sst->mutate_sstable_level(0).get(); - const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), cf, streaming::stream_reason::repair).get0(); - sstring datadir = cf._config.datadir; - if (use_view_update_path) { - // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. - datadir += "/staging"; - } - sst->create_links(datadir, gen).get(); - sstables::remove_by_toc_name(sst->toc_filename(), error_handler_for_upload_dir()).get(); - comps.generation = gen; - comps.sstdir = std::move(datadir); - return std::move(comps); - }); - }).get0(); - - flushed.push_back(std::move(descriptors)); + auto datadir = upload.parent_path(); + if (use_view_update_path) { + // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. + datadir /= "staging"; } - return std::vector(std::move(flushed)); + + size_t loaded = directory.map_reduce0([&db, ks, cf, datadir, &view_update_generator] (sstables::sstable_directory& dir) { + return make_sstables_available(dir, db, view_update_generator, datadir, ks, cf); + }, size_t(0), std::plus()).get0(); + + dblog.info("Loaded {} SSTables into {}", loaded, datadir.native()); }); } @@ -552,205 +505,6 @@ future<> distributed_loader::open_sstable(distributed& db, sstables::e }); } -// invokes each descriptor at its target shard, which involves forwarding sstables too. -static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, sstring directory, std::vector jobs, - std::function (std::vector, uint32_t level, uint64_t max_sstable_bytes)> func) { - return parallel_for_each(std::move(jobs), [cf, func, &directory] (sstables::resharding_descriptor& job) mutable { - return forward_sstables_to(job.reshard_at, directory, std::move(job.sstables), cf, - [cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) { - // compaction manager ensures that only one reshard operation will run per shard. - auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable { - return func(std::move(sstables), level, max_sstable_bytes); - }; - return cf->get_compaction_manager().run_resharding_job(&*cf, std::move(job)); - }); - }); -} - -static std::vector sstables_for_shard(const std::vector& sstables, shard_id shard) { - auto belongs_to_shard = [] (const sstables::shared_sstable& sst, unsigned shard) { - auto& shards = sst->get_shards_for_this_sstable(); - return boost::range::find(shards, shard) != shards.end(); - }; - - return boost::copy_range>(sstables - | boost::adaptors::filtered([&] (auto& sst) { return belongs_to_shard(sst, shard); })); -} - -void distributed_loader::reshard(distributed& db, sstring ks_name, sstring cf_name) { - assert(this_shard_id() == 0); // NOTE: should always run on shard 0! - - // ensures that only one column family is resharded at a time (that's okay because - // actual resharding is parallelized), and that's needed to prevent the same column - // family from being resharded in parallel (that could happen, for example, if - // refresh (triggers resharding) is issued by user while resharding is going on). - static semaphore sem(1); - - // FIXME: discarded future. - (void)with_semaphore(sem, 1, [&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable { - return seastar::async([&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable { - global_column_family_ptr cf(db, ks_name, cf_name); - - if (!cf->get_compaction_manager().enabled()) { - return; - } - // fast path to detect that this column family doesn't need reshard. - if (!worth_resharding(db, cf).get0()) { - dblog.debug("Nothing to reshard for {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name()); - return; - } - - parallel_for_each(cf->_config.all_datadirs, [&db, cf] (const sstring& directory) { - auto candidates = get_all_shared_sstables(db, directory, cf).get0(); - dblog.debug("{} candidates for resharding for {}.{}", candidates.size(), cf->schema()->ks_name(), cf->schema()->cf_name()); - auto jobs = cf->get_compaction_strategy().get_resharding_jobs(*cf, std::move(candidates)); - dblog.debug("{} resharding jobs for {}.{}", jobs.size(), cf->schema()->ks_name(), cf->schema()->cf_name()); - - return invoke_all_resharding_jobs(cf, directory, std::move(jobs), [directory, &cf] (auto sstables, auto level, auto max_sstable_bytes) { - // FIXME: run it in maintenance priority. - // Resharding, currently, cannot provide compaction with a snapshot of the sstable set - // which spans all shards that input sstables belong to, so expiration is disabled. - std::optional sstable_set = std::nullopt; - sstables::compaction_descriptor descriptor(sstables, std::move(sstable_set), service::get_local_compaction_priority(), - level, max_sstable_bytes); - descriptor.options = sstables::compaction_options::make_reshard(); - descriptor.creator = [&cf, directory] (shard_id shard) mutable { - // we need generation calculated by instance of cf at requested shard, - // or resource usage wouldn't be fairly distributed among shards. - auto gen = smp::submit_to(shard, [&cf] () { - return cf->calculate_generation_for_new_table(); - }).get0(); - - return cf->make_sstable(directory, gen, - cf->get_sstables_manager().get_highest_supported_format(), - sstables::sstable::format_types::big); - }; - auto f = sstables::compact_sstables(std::move(descriptor), *cf); - - return f.then([&cf, sstables = std::move(sstables), directory] (sstables::compaction_info info) mutable { - auto new_sstables = std::move(info.new_sstables); - // an input sstable may belong to shard 1 and 2 and only have data which - // token belongs to shard 1. That means resharding will only create a - // sstable for shard 1, but both shards opened the sstable. So our code - // below should ask both shards to remove the resharded table, or it - // wouldn't be deleted by our deletion manager, and resharding would be - // triggered again in the subsequent boot. - return parallel_for_each(boost::irange(0u, smp::count), [&cf, directory, sstables, new_sstables] (auto shard) { - auto old_sstables_for_shard = sstables_for_shard(sstables, shard); - // nothing to do if no input sstable belongs to this shard. - if (old_sstables_for_shard.empty()) { - return make_ready_future<>(); - } - auto new_sstables_for_shard = sstables_for_shard(new_sstables, shard); - // sanity checks - for (auto& sst : new_sstables_for_shard) { - auto& shards = sst->get_shards_for_this_sstable(); - if (shards.size() != 1) { - throw std::runtime_error(format("resharded sstable {} doesn't belong to only one shard", sst->get_filename())); - } - if (shards.front() != shard) { - throw std::runtime_error(format("resharded sstable {} should belong to shard {:d}", sst->get_filename(), shard)); - } - } - - std::unordered_set ancestors; - boost::range::transform(old_sstables_for_shard, std::inserter(ancestors, ancestors.end()), - std::mem_fn(&sstables::sstable::generation)); - - if (new_sstables_for_shard.empty()) { - // handles case where sstable needing rewrite doesn't produce any sstable - // for a shard it belongs to when resharded (the reason is explained above). - return smp::submit_to(shard, [cf, ancestors = std::move(ancestors)] () mutable { - return cf->remove_ancestors_needed_rewrite(ancestors); - }); - } else { - return forward_sstables_to(shard, directory, new_sstables_for_shard, cf, [cf, ancestors = std::move(ancestors)] (std::vector sstables) mutable { - return cf->replace_ancestors_needed_rewrite(std::move(ancestors), std::move(sstables)); - }); - } - }).then([&cf, sstables] { - // schedule deletion of shared sstables after we're certain that new unshared ones were successfully forwarded to respective shards. - (void)sstables::delete_atomically(sstables).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) { - try { - std::rethrow_exception(eptr); - } catch (...) { - dblog.warn("Exception in resharding when deleting sstable file: {}", eptr); - } - }).then([cf, sstables = std::move(sstables)] { - // Refresh cache's snapshot of shards involved in resharding to prevent the cache from - // holding reference to deleted files which results in disk space not being released. - std::unordered_set owner_shards; - for (auto& sst : sstables) { - const auto& shards = sst->get_shards_for_this_sstable(); - owner_shards.insert(shards.begin(), shards.end()); - if (owner_shards.size() == smp::count) { - break; - } - } - return parallel_for_each(std::move(owner_shards), [cf] (shard_id shard) { - return smp::submit_to(shard, [cf] () mutable { - cf->_cache.refresh_snapshot(); - }); - }); - }); - }); - }); - }); - }).get(); - }); - }); -} - -future<> distributed_loader::load_new_sstables(distributed& db, distributed& view_update_generator, - sstring ks, sstring cf, std::vector new_tables) { - return parallel_for_each(new_tables, [&] (auto comps) { - auto cf_sstable_open = [comps] (column_family& cf, sstables::foreign_sstable_open_info info) { - auto f = cf.open_sstable(std::move(info), comps.sstdir, comps.generation, comps.version, comps.format); - return f.then([&cf] (sstables::shared_sstable sst) mutable { - if (sst) { - cf._sstables_opened_but_not_loaded.push_back(sst); - } - return make_ready_future<>(); - }); - }; - return distributed_loader::open_sstable(db, comps, cf_sstable_open, service::get_local_compaction_priority()) - .handle_exception([comps, ks, cf] (std::exception_ptr ep) { - auto name = sstables::sstable::filename(comps.sstdir, ks, cf, comps.version, comps.generation, comps.format, sstables::component_type::TOC); - dblog.error("Failed to open {}: {}", name, ep); - return make_exception_future<>(ep); - }); - }).then([&db, &view_update_generator, ks, cf] { - return db.invoke_on_all([&view_update_generator, ks = std::move(ks), cfname = std::move(cf)] (database& db) { - auto& cf = db.find_column_family(ks, cfname); - return cf.get_row_cache().invalidate([&view_update_generator, &cf] () noexcept { - // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. - // atomically load all opened sstables into column family. - for (auto& sst : cf._sstables_opened_but_not_loaded) { - try { - cf.load_sstable(sst, true); - } catch(...) { - dblog.error("Failed to load {}: {}. Aborting.", sst->toc_filename(), std::current_exception()); - abort(); - } - if (sst->requires_view_building()) { - // FIXME: discarded future. - (void)view_update_generator.local().register_staging_sstable(sst, cf.shared_from_this()); - } - } - cf._sstables_opened_but_not_loaded.clear(); - cf.trigger_compaction(); - }); - }); - }).handle_exception([&db, ks, cf] (std::exception_ptr ep) { - return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) { - auto& cf = db.find_column_family(ks, cfname); - cf._sstables_opened_but_not_loaded.clear(); - }).then([ep] { - return make_exception_future<>(ep); - }); - }); -} - future distributed_loader::probe_file(distributed& db, sstring sstdir, sstring fname) { using namespace sstables; @@ -872,105 +626,71 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele }); } -future<> distributed_loader::do_populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { - // We can catch most errors when we try to load an sstable. But if the TOC - // file is the one missing, we won't try to load the sstable at all. This - // case is still an invalid case, but it is way easier for us to treat it - // by waiting for all files to be loaded, and then checking if we saw a - // file during scan_dir, without its corresponding TOC. - enum class component_status { - has_some_file, - has_toc_file, - has_temporary_toc_file, - }; - - struct sstable_descriptor { - component_status status; - sstables::sstable::version_types version; - sstables::sstable::format_types format; - }; - - auto verifier = make_lw_shared>(); - - return do_with(std::vector>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector>& futures) { - return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) { - // FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".") - - // push future returned by probe_file into an array of futures, - // so that the supplied callback will not block scan_dir() from - // reading the next entry in the directory. - auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) { - if (entry.component == component_type::TemporaryStatistics) { - return remove_file(sstables::sstable::filename(sstdir.native(), entry.ks, entry.cf, entry.version, entry.generation, - entry.format, component_type::TemporaryStatistics)); - } - - if (verifier->count(entry.generation)) { - if (verifier->at(entry.generation).status == component_status::has_toc_file) { - fs::path file_path(sstdir / de.name); - if (entry.component == component_type::TOC) { - throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed", file_path.native()); - } else if (entry.component == component_type::TemporaryTOC) { - throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed", file_path.native()); - } - } else if (entry.component == component_type::TOC) { - verifier->at(entry.generation).status = component_status::has_toc_file; - } else if (entry.component == component_type::TemporaryTOC) { - verifier->at(entry.generation).status = component_status::has_temporary_toc_file; - } - } else { - if (entry.component == component_type::TOC) { - verifier->emplace(entry.generation, sstable_descriptor{component_status::has_toc_file, entry.version, entry.format}); - } else if (entry.component == component_type::TemporaryTOC) { - verifier->emplace(entry.generation, sstable_descriptor{component_status::has_temporary_toc_file, entry.version, entry.format}); - } else { - verifier->emplace(entry.generation, sstable_descriptor{component_status::has_some_file, entry.version, entry.format}); - } - } - return make_ready_future<>(); - }); - - futures.push_back(std::move(f)); - - return make_ready_future<>(); - }, &sstables::manifest_json_filter).then([&futures] { - return execute_futures(futures); - }).then([verifier, sstdir, ks = std::move(ks), cf = std::move(cf)] { - return do_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), verifier] (auto v) { - if (v.second.status == component_status::has_temporary_toc_file) { - unsigned long gen = v.first; - sstables::sstable::version_types version = v.second.version; - sstables::sstable::format_types format = v.second.format; - - if (this_shard_id() != 0) { - dblog.debug("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first); - return make_ready_future<>(); - } - // shard 0 is the responsible for removing a partial sstable. - return sstables::sstable::remove_sstable_with_temp_toc(ks, cf, sstdir, gen, version, format); - } else if (v.second.status != component_status::has_toc_file) { - throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable with generation {:d}!. Refusing to boot", sstdir, v.first)); - } - return make_ready_future<>(); - }); - }); - }); - -} - future<> distributed_loader::populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf)] { + assert(this_shard_id() == 0); // First pass, cleanup temporary sstable directories and sstables pending delete. - if (this_shard_id() == 0) { - cleanup_column_family_temp_sst_dirs(sstdir).get(); - auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename(); - auto exists = file_exists(pending_delete_dir).get0(); - if (exists) { - handle_sstables_pending_delete(pending_delete_dir).get(); - } + cleanup_column_family_temp_sst_dirs(sstdir).get(); + auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename(); + auto exists = file_exists(pending_delete_dir).get0(); + if (exists) { + handle_sstables_pending_delete(pending_delete_dir).get(); } - // Second pass, cleanup sstables with temporary TOCs and load the rest. - do_populate_column_family(db, std::move(sstdir), std::move(ks), std::move(cf)).get(); + + global_column_family_ptr global_table(db, ks, cf); + + sharded directory; + directory.start(fs::path(sstdir), db.local().get_config().initial_sstable_loading_concurrency(), + sstables::sstable_directory::need_mutate_level::yes, + sstables::sstable_directory::lack_of_toc_fatal::yes, + sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()), + sstables::sstable_directory::allow_loading_materialized_view::yes, + [&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { + return global_table->make_sstable(dir.native(), gen, v, f); + }).get(); + + auto stop = defer([&directory] { + directory.stop().get(); + }); + + lock_table(directory, db, ks, cf).get(); + process_sstable_dir(directory).get(); + + // If we are resharding system tables before we can read them, we will not + // know which is the highest format we support: this information is itself stored + // in the system tables. In that case we'll rely on what we find on disk: we'll + // at least not downgrade any files. If we already know that we support a higher + // format than the one we see then we use that. + auto sys_format = global_table->get_sstables_manager().get_highest_supported_format(); + auto sst_version = highest_version_seen(directory, sys_format).get0(); + auto generation = highest_generation_seen(directory).get0(); + + db.invoke_on_all([&global_table, generation] (database& db) { + global_table->update_sstables_known_generation(generation); + global_table->disable_auto_compaction(); + return make_ready_future<>(); + }).get(); + + reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable { + auto gen = smp::submit_to(shard, [&global_table] () { + return global_table->calculate_generation_for_new_table(); + }).get0(); + + return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big); + }).get(); + + // The node is offline at this point so we are very lenient with what we consider + // offstrategy. + reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) { + auto gen = global_table->calculate_generation_for_new_table(); + return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big); + }).get(); + + directory.invoke_on_all([global_table] (sstables::sstable_directory& dir) { + return dir.do_for_each_sstable([&global_table] (sstables::shared_sstable sst) { + return global_table->add_sstable_and_update_cache(sst); + }); + }).get(); }); } diff --git a/distributed_loader.hh b/distributed_loader.hh index c292995364..bf177736bb 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -28,6 +28,7 @@ #include #include #include +#include #include "seastarx.hh" #include "sstables/compaction_descriptor.hh" @@ -58,19 +59,22 @@ class migration_manager; class distributed_loader { public: + static future<> reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, + sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator); static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator); static future<> process_sstable_dir(sharded& dir); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); - static void reshard(distributed& db, sstring ks_name, sstring cf_name); static future<> open_sstable(distributed& db, sstables::entry_descriptor comps, std::function (column_family&, sstables::foreign_sstable_open_info)> func, const io_priority_class& pc = default_priority_class()); static future<> verify_owner_and_mode(std::filesystem::path path); - static future<> load_new_sstables(distributed& db, distributed& view_update_generator, - sstring ks, sstring cf, std::vector new_tables); - static future> flush_upload_dir(distributed& db, distributed& sys_dist_ks, sstring ks_name, sstring cf_name); - static future<> process_upload_dir(distributed& db, sstring ks_name, sstring cf_name); + + static future make_sstables_available(sstables::sstable_directory& dir, + sharded& db, sharded& view_update_generator, + std::filesystem::path datadir, sstring ks, sstring cf); + static future<> process_upload_dir(distributed& db, distributed& sys_dist_ks, + distributed& view_update_generator, sstring ks_name, sstring cf_name); static future probe_file(distributed& db, sstring sstdir, sstring fname); static future<> populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); diff --git a/main.cc b/main.cc index 1a9a56b075..e06449a57e 100644 --- a/main.cc +++ b/main.cc @@ -762,6 +762,11 @@ int main(int ac, char** av) { dirs.init(*cfg, bool(hinted_handoff_enabled)).get(); + // We need the compaction manager ready early so we can reshard. + db.invoke_on_all([&proxy, &stop_signal] (database& db) { + db.get_compaction_manager().enable(); + }).get(); + // Initialization of a keyspace is done by shard 0 only. For system // keyspace, the procedure will go through the hardcoded column // families, and in each of them, it will load the sstables for all @@ -926,8 +931,11 @@ int main(int ac, char** av) { } } - db.invoke_on_all([&proxy] (database& db) { - db.get_compaction_manager().enable(); + db.invoke_on_all([] (database& db) { + for (auto& x : db.get_column_families()) { + table& t = *(x.second); + t.enable_auto_compaction(); + } }).get(); // If the same sstable is shared by several shards, it cannot be @@ -937,10 +945,10 @@ int main(int ac, char** av) { // we will have races between the compaction and loading processes // We also want to trigger regular compaction on boot. - for (auto& x : db.local().get_column_families()) { - column_family& cf = *(x.second); - distributed_loader::reshard(db, cf.schema()->ks_name(), cf.schema()->cf_name()); - } + // FIXME: temporary as this code is being replaced. I am keeping the scheduling + // group that was effectively used in the bulk of it (compaction). Soon it will become + // streaming + db.invoke_on_all([&proxy] (database& db) { for (auto& x : db.get_column_families()) { column_family& cf = *(x.second); diff --git a/service/storage_service.cc b/service/storage_service.cc index 9166a0fd31..8138238202 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2789,18 +2789,6 @@ void storage_service::add_expire_time_if_found(inet_address endpoint, int64_t ex // All the global operations are going to happen here, and just the reloading happens // in there. future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { - class max_element { - int64_t _result = 0; - public: - future<> operator()(int64_t value) { - _result = std::max(value, _result); - return make_ready_future<>(); - } - int64_t get() && { - return _result; - } - }; - if (_loading_new_sstables) { throw std::runtime_error("Already loading SSTables. Try again later"); } else { @@ -2809,100 +2797,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { slogger.info("Loading new SSTables for {}.{}...", ks_name, cf_name); - return distributed_loader::process_upload_dir(_db, ks_name, cf_name).then([this, ks_name, cf_name] { - // First, we need to stop SSTable creation for that CF in all shards. This is a really horrible - // thing to do, because under normal circumnstances this can make dirty memory go up to the point - // of explosion. - // - // Remember, however, that we are assuming this is going to be ran on an empty CF. In that scenario, - // stopping the SSTables should have no effect, while guaranteeing we will see no data corruption - // * in case * this is ran on a live CF. - // - // The statement above is valid at least from the Scylla side of things: it is still totally possible - // that someones just copies the table over existing ones. There isn't much we can do about it. - return _db.map_reduce(max_element(), [ks_name, cf_name] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.disable_sstable_write(); - }).then([this, cf_name, ks_name] (int64_t max_seen_sstable) { - // Then, we will reshuffle the tables to make sure that the generation numbers don't go too high. - // We will do all of it the same CPU, to make sure that we won't have two parallel shufflers stepping - // onto each other. - - class all_generations { - std::set _result; - public: - future<> operator()(std::set value) { - _result.insert(value.begin(), value.end()); - return make_ready_future<>(); - } - std::set get() && { - return _result; - } - }; - - // We provide to reshuffle_sstables() the generation of all existing sstables, such that it will - // easily know which sstables are new. - return _db.map_reduce(all_generations(), [ks_name, cf_name] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - std::set generations; - for (auto& p : *(cf.get_sstables())) { - generations.insert(p->generation()); - } - return make_ready_future>(std::move(generations)); - }).then([this, max_seen_sstable, ks_name, cf_name] (std::set all_generations) { - auto shard = std::hash()(cf_name) % smp::count; - return _db.invoke_on(shard, [ks_name, cf_name, max_seen_sstable, all_generations = std::move(all_generations)] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1); - }); - }); - }).then_wrapped([this, ks_name, cf_name] (future> f) { - std::vector new_tables; - std::exception_ptr eptr; - int64_t new_gen = -1; - - try { - new_tables = f.get0(); - } catch(std::exception& e) { - slogger.error("Loading of new tables failed to {}.{} due to {}", ks_name, cf_name, e.what()); - eptr = std::current_exception(); - } catch(...) { - slogger.error("Loading of new tables failed to {}.{} due to unexpected reason", ks_name, cf_name); - eptr = std::current_exception(); - } - - if (new_tables.size() > 0) { - new_gen = new_tables.back().generation; - } - - slogger.debug("Now accepting writes for sstables with generation larger or equal than {}", new_gen); - return _db.invoke_on_all([ks_name, cf_name, new_gen] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - auto disabled = std::chrono::duration_cast(cf.enable_sstable_write(new_gen)).count(); - slogger.info("CF {}.{} at shard {} had SSTables writes disabled for {} usec", ks_name, cf_name, this_shard_id(), disabled); - return make_ready_future<>(); - }).then([new_tables = std::move(new_tables), eptr = std::move(eptr)] { - if (eptr) { - return make_exception_future>(eptr); - } - return make_ready_future>(std::move(new_tables)); - }); - }).then([this, ks_name, cf_name] (std::vector new_tables) { - auto f = distributed_loader::flush_upload_dir(_db, _sys_dist_ks, ks_name, cf_name); - return f.then([new_tables = std::move(new_tables), ks_name, cf_name] (std::vector new_tables_from_upload) mutable { - if (new_tables.empty() && new_tables_from_upload.empty()) { - slogger.info("No new SSTables were found for {}.{}", ks_name, cf_name); - } - // merge new sstables found in both column family and upload directories, if any. - new_tables.insert(new_tables.end(), new_tables_from_upload.begin(), new_tables_from_upload.end()); - return make_ready_future>(std::move(new_tables)); - }); - }).then([this, ks_name, cf_name] (std::vector new_tables) { - return distributed_loader::load_new_sstables(_db, _view_update_generator, ks_name, cf_name, std::move(new_tables)).then([ks_name, cf_name] { - slogger.info("Done loading new SSTables for {}.{} for all shards", ks_name, cf_name); - }); - }); - }).finally([this] { + return distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name).finally([this, ks_name, cf_name] { + slogger.info("Done loading new SSTables for {}.{}", ks_name, cf_name); _loading_new_sstables = false; }); } @@ -2921,6 +2817,25 @@ void storage_service::shutdown_client_servers() { } } +future<> +storage_service::set_tables_autocompaction(const sstring &keyspace, std::vector tables, bool enabled) { + if (_initialized) { + return make_exception_future<>(std::runtime_error("Too early: storage service not initialized yet")); + } + + return _db.invoke_on_all([keyspace, tables, enabled] (database& db) { + return parallel_for_each(tables, [&db, keyspace, enabled](const sstring& table) mutable { + column_family& cf = db.find_column_family(keyspace, table); + if (enabled) { + cf.enable_auto_compaction(); + } else { + cf.disable_auto_compaction(); + } + return make_ready_future<>(); + }); + }); +} + std::unordered_multimap storage_service::get_new_source_ranges(const sstring& keyspace_name, const dht::token_range_vector& ranges) { auto my_address = get_broadcast_address(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 60231f9c99..4adf9ef9e1 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -853,6 +853,8 @@ public: */ future<> load_new_sstables(sstring ks_name, sstring cf_name); + future<> set_tables_autocompaction(const sstring &keyspace, std::vector tables, bool enabled); + template auto run_with_api_lock(sstring operation, Func&& func) { return get_storage_service().invoke_on(0, [operation = std::move(operation), diff --git a/sstables/compaction.cc b/sstables/compaction.cc index aac24a3959..05f6e1150f 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -74,6 +74,25 @@ namespace sstables { logging::logger clogger("compaction"); +std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) { + static constexpr const char* suffixes[] = { " bytes", "kB", "MB", "GB", "TB", "PB" }; + + unsigned exp = 0; + while ((data._size >= 1000) && (exp < sizeof(suffixes))) { + exp++; + data._size /= 1000; + } + + os << data._size << suffixes[exp]; + return os; +} + +std::ostream& operator<<(std::ostream& os, pretty_printed_throughput tp) { + uint64_t throughput = tp._duration.count() > 0 ? tp._size / tp._duration.count() : 0; + os << pretty_printed_data_size(throughput) << "/s"; + return os; +} + static api::timestamp_type get_max_purgeable_timestamp(const column_family& cf, sstable_set::incremental_selector& selector, const std::unordered_set& compacting_set, const dht::decorated_key& dk) { auto timestamp = api::max_timestamp; @@ -371,34 +390,6 @@ public: } }; -// Resharding doesn't really belong into any strategy, because it is not worried about laying out -// SSTables according to any strategy-specific criteria. So we will just make it proportional to -// the amount of data we still have to reshard. -// -// Although at first it may seem like we could improve this by tracking the ongoing reshard as well -// and reducing the backlog as we compact, that is not really true. Resharding is not really -// expected to get rid of data and it is usually just splitting data among shards. Whichever backlog -// we get rid of by tracking the compaction will come back as a big spike as we add this SSTable -// back to their rightful shard owners. -// -// So because the data is supposed to be constant, we will just add the total amount of data as the -// backlog. -class resharding_backlog_tracker final : public compaction_backlog_tracker::impl { - uint64_t _total_bytes = 0; -public: - virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { - return _total_bytes; - } - - virtual void add_sstable(sstables::shared_sstable sst) override { - _total_bytes += sst->data_size(); - } - - virtual void remove_sstable(sstables::shared_sstable sst) override { - _total_bytes -= sst->data_size(); - } -}; - class compaction { protected: column_family& _cf; @@ -583,15 +574,19 @@ private: return consumer(make_sstable_reader()); } - virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) = 0; - virtual bool use_interposer_consumer() const = 0; + virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) { + return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer)); + } + + virtual bool use_interposer_consumer() const { + return _cf.get_compaction_strategy().use_interposer_consumer(); + } compaction_info finish(std::chrono::time_point started_at, std::chrono::time_point ended_at) { _info->ended_at = std::chrono::duration_cast(ended_at.time_since_epoch()).count(); auto ratio = double(_info->end_size) / double(_info->start_size); auto duration = std::chrono::duration(ended_at - started_at); // Don't report NaN or negative number. - auto throughput = duration.count() > 0 ? (double(_info->end_size) / (1024*1024)) / duration.count() : double{}; sstring new_sstables_msg; on_end_of_compaction(); @@ -605,10 +600,10 @@ private: // - add support to merge summary (message: Partition merge counts were {%s}.). // - there is no easy way, currently, to know the exact number of total partitions. // By the time being, using estimated key count. - sstring formatted_msg = sprint("%ld sstables to [%s]. %ld bytes to %ld (~%d%% of original) in %dms = %.2fMB/s. " \ - "~%ld total partitions merged to %ld.", - _info->sstables, new_sstables_msg, _info->start_size, _info->end_size, int(ratio * 100), - std::chrono::duration_cast(duration).count(), throughput, + sstring formatted_msg = fmt::format("{} sstables to [{}]. {} to {} (~{} of original) in {}ms = {}. " \ + "~{} total partitions merged to {}.", + _info->sstables, new_sstables_msg, pretty_printed_data_size(_info->start_size), pretty_printed_data_size(_info->end_size), int(ratio * 100), + std::chrono::duration_cast(duration).count(), pretty_printed_throughput(_info->end_size, duration), _info->total_partitions, _info->total_keys_written); report_finish(formatted_msg, ended_at); @@ -621,7 +616,7 @@ private: virtual void report_start(const sstring& formatted_msg) const = 0; virtual void report_finish(const sstring& formatted_msg, std::chrono::time_point ended_at) const = 0; - virtual void backlog_tracker_adjust_charges() = 0; + virtual void backlog_tracker_adjust_charges() { }; std::function max_purgeable_func() { if (!tombstone_expiration_enabled()) { @@ -640,9 +635,9 @@ private: }; } - virtual void on_new_partition() = 0; + virtual void on_new_partition() {} - virtual void on_end_of_compaction() = 0; + virtual void on_end_of_compaction() {}; // create a writer based on decorated key. virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) = 0; @@ -743,6 +738,53 @@ void garbage_collected_sstable_writer::data::finish_sstable_writer() { } } +class reshape_compaction : public compaction { +public: + reshape_compaction(column_family& cf, compaction_descriptor descriptor) + : compaction(cf, std::move(descriptor)) { + _info->run_identifier = _run_identifier; + _info->type = compaction_type::Reshape; + } + + flat_mutation_reader make_sstable_reader() const override { + return ::make_local_shard_sstable_reader(_schema, + _permit, + _compacting, + query::full_partition_range, + _schema->full_slice(), + _io_priority, + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + default_read_monitor_generator()); + } + + void report_start(const sstring& formatted_msg) const override { + clogger.info("Reshaping {}", formatted_msg); + } + + void report_finish(const sstring& formatted_msg, std::chrono::time_point ended_at) const override { + clogger.info("Reshaped {}", formatted_msg); + } + + virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override { + auto sst = _sstable_creator(this_shard_id()); + setup_new_sstable(sst); + + sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer(); + cfg.max_sstable_size = _max_sstable_size; + cfg.monitor = &default_write_monitor(); + cfg.run_identifier = _run_identifier; + return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats(), _io_priority), sst}; + } + + virtual void stop_sstable_writer(compaction_writer* writer) override { + if (writer) { + finish_new_sstable(writer); + } + } +}; + class regular_compaction : public compaction { // sstable being currently written. mutable compaction_read_monitor_generator _monitor_generator; @@ -768,14 +810,6 @@ public: _monitor_generator); } - reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { - return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer)); - } - - bool use_interposer_consumer() const override { - return _cf.get_compaction_strategy().use_interposer_consumer(); - } - void report_start(const sstring& formatted_msg) const override { clogger.info("Compacting {}", formatted_msg); } @@ -1185,7 +1219,6 @@ flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_co class resharding_compaction final : public compaction { shard_id _shard; // shard of current sstable writer - compaction_backlog_tracker _resharding_backlog_tracker; // Partition count estimation for a shard S: // @@ -1215,14 +1248,10 @@ private: public: resharding_compaction(column_family& cf, sstables::compaction_descriptor descriptor) : compaction(cf, std::move(descriptor)) - , _resharding_backlog_tracker(std::make_unique()) , _estimation_per_shard(smp::count) , _run_identifiers(smp::count) { - cf.get_compaction_manager().register_backlog_tracker(_resharding_backlog_tracker); for (auto& sst : _sstables) { - _resharding_backlog_tracker.add_sstable(sst); - const auto& shards = sst->get_shards_for_this_sstable(); auto size = sst->bytes_on_disk(); auto estimated_partitions = sst->get_estimated_key_count(); @@ -1237,11 +1266,7 @@ public: _info->type = compaction_type::Reshard; } - ~resharding_compaction() { - for (auto& s : _sstables) { - _resharding_backlog_tracker.remove_sstable(s); - } - } + ~resharding_compaction() { } // Use reader that makes sure no non-local mutation will not be filtered out. flat_mutation_reader make_sstable_reader() const override { @@ -1284,6 +1309,7 @@ public: sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer(); cfg.max_sstable_size = _max_sstable_size; + cfg.monitor = &default_write_monitor(); // sstables generated for a given shard will share the same run identifier. cfg.run_identifier = _run_identifiers.at(shard); return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(shard), cfg, get_encoding_stats(), _io_priority, shard), sst}; @@ -1320,7 +1346,14 @@ future compaction::run(std::unique_ptr c, GCConsume compaction_type compaction_options::type() const { // Maps options_variant indexes to the corresponding compaction_type member. - static const compaction_type index_to_type[] = {compaction_type::Compaction, compaction_type::Cleanup, compaction_type::Upgrade, compaction_type::Scrub, compaction_type::Reshard}; + static const compaction_type index_to_type[] = { + compaction_type::Compaction, + compaction_type::Cleanup, + compaction_type::Upgrade, + compaction_type::Scrub, + compaction_type::Reshard, + compaction_type::Reshape, + }; return index_to_type[_options.index()]; } @@ -1329,6 +1362,9 @@ static std::unique_ptr make_compaction(column_family& cf, sstables:: column_family& cf; sstables::compaction_descriptor&& descriptor; + std::unique_ptr operator()(compaction_options::reshape) { + return std::make_unique(cf, std::move(descriptor)); + } std::unique_ptr operator()(compaction_options::reshard) { return std::make_unique(cf, std::move(descriptor)); } diff --git a/sstables/compaction.hh b/sstables/compaction.hh index 187a360aa4..70516228e2 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -35,12 +35,19 @@ class flat_mutation_reader; namespace sstables { + class pretty_printed_data_size { + uint64_t _size; + public: + pretty_printed_data_size(uint64_t size) : _size(size) {} + friend std::ostream& operator<<(std::ostream&, pretty_printed_data_size); + }; - struct resharding_descriptor { - std::vector sstables; - uint64_t max_sstable_bytes; - shard_id reshard_at; - uint32_t level; + class pretty_printed_throughput { + uint64_t _size; + std::chrono::duration _duration; + public: + pretty_printed_throughput(uint64_t size, std::chrono::duration dur) : _size(size), _duration(std::move(dur)) {} + friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput); }; static inline sstring compaction_name(compaction_type type) { @@ -59,6 +66,8 @@ namespace sstables { return "RESHARD"; case compaction_type::Upgrade: return "UPGRADE"; + case compaction_type::Reshape: + return "RESHAPE"; default: throw std::runtime_error("Invalid Compaction Type"); } diff --git a/sstables/compaction_descriptor.hh b/sstables/compaction_descriptor.hh index 5e0fa7cf72..159fd49be8 100644 --- a/sstables/compaction_descriptor.hh +++ b/sstables/compaction_descriptor.hh @@ -43,6 +43,7 @@ enum class compaction_type { Index_build = 4, Reshard = 5, Upgrade = 6, + Reshape = 7, }; struct compaction_completion_desc { @@ -72,9 +73,10 @@ public: }; struct reshard { }; - + struct reshape { + }; private: - using options_variant = std::variant; + using options_variant = std::variant; private: options_variant _options; @@ -84,6 +86,10 @@ private: } public: + static compaction_options make_reshape() { + return compaction_options(reshape{}); + } + static compaction_options make_reshard() { return compaction_options(reshard{}); } diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 563ca8cdf8..3055725f6f 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -309,7 +309,7 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) { return task->compaction_done.get_future().then([task] {}); } -future<> compaction_manager::run_resharding_job(column_family* cf, std::function()> job) { +future<> compaction_manager::run_custom_job(column_family* cf, sstring name, noncopyable_function()> job) { if (_state != state::enabled) { return make_ready_future<>(); } @@ -318,9 +318,9 @@ future<> compaction_manager::run_resharding_job(column_family* cf, std::function task->compacting_cf = cf; _tasks.push_back(task); - task->compaction_done = with_semaphore(_resharding_sem, 1, [this, task, cf, job = std::move(job)] { + task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, cf, job = std::move(job)] () mutable { // take read lock for cf, so major compaction and resharding can't proceed in parallel. - return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] { + return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] () mutable { _stats.active_tasks++; if (!can_proceed(task)) { return make_ready_future<>(); @@ -329,20 +329,17 @@ future<> compaction_manager::run_resharding_job(column_family* cf, std::function // NOTE: // no need to register shared sstables because they're excluded from non-resharding // compaction and some of them may not even belong to current shard. - - return with_scheduling_group(_scheduling_group, [job = std::move(job)] { - return job(); - }); + return job(); }); - }).then_wrapped([this, task] (future<> f) { + }).then_wrapped([this, task, name] (future<> f) { _stats.active_tasks--; _tasks.remove(task); try { f.get(); } catch (sstables::compaction_stop_exception& e) { - cmlog.info("resharding was abruptly stopped, reason: {}", e.what()); + cmlog.info("{} was abruptly stopped, reason: {}", name, e.what()); } catch (...) { - cmlog.error("resharding failed: {}", std::current_exception()); + cmlog.error("{} failed: {}", name, std::current_exception()); } }); return task->compaction_done.get_future().then([task] {}); diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index 7ea5d0dc58..bbd3b0e728 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -109,7 +109,7 @@ private: // Prevents column family from running major and minor compaction at same time. std::unordered_map _compaction_locks; - semaphore _resharding_sem{1}; + semaphore _custom_job_sem{1}; std::function compaction_submission_callback(); // all registered column families are submitted for compaction at a constant interval. @@ -216,15 +216,13 @@ public: // Submit a column family for major compaction. future<> submit_major_compaction(column_family* cf); - // Run a resharding job for a given column family. + + // Run a custom job for a given column family, defined by a function // it completes when future returned by job is ready or returns immediately // if manager was asked to stop. // - // parameter job is a function that will carry the reshard operation on a set - // of sstables that belong to different shards for this column family using - // sstables::reshard_sstables(), and in the end, it will forward unshared - // sstables created by the process to their owner shards. - future<> run_resharding_job(column_family* cf, std::function()> job); + // parameter job is a function that will carry the operation + future<> run_custom_job(column_family* cf, sstring name, noncopyable_function()> job); // Remove a column family from the compaction manager. // Cancel requests on cf and wait for a possible ongoing compaction on cf. diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 11bb2b6fa9..f875d32841 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -460,19 +460,6 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold; } -std::vector -compaction_strategy_impl::get_resharding_jobs(column_family& cf, std::vector candidates) { - std::vector jobs; - shard_id reshard_at_current = 0; - - clogger.debug("Trying to get resharding jobs for {}.{}...", cf.schema()->ks_name(), cf.schema()->cf_name()); - for (auto& candidate : candidates) { - auto level = candidate->get_sstable_level(); - jobs.push_back(resharding_descriptor{{std::move(candidate)}, std::numeric_limits::max(), reshard_at_current++ % smp::count, level}); - } - return jobs; -} - uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) { return partition_estimate; } @@ -481,6 +468,11 @@ reader_consumer compaction_strategy_impl::make_interposer_consumer(const mutatio return end_consumer; } +compaction_descriptor +compaction_strategy_impl::get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) { + return compaction_descriptor(); +} + } // namespace sstables size_tiered_backlog_tracker::inflight_component @@ -980,10 +972,6 @@ compaction_descriptor compaction_strategy::get_major_compaction_job(column_famil return _compaction_strategy_impl->get_major_compaction_job(cf, std::move(candidates)); } -std::vector compaction_strategy::get_resharding_jobs(column_family& cf, std::vector candidates) { - return _compaction_strategy_impl->get_resharding_jobs(cf, std::move(candidates)); -} - void compaction_strategy::notify_completion(const std::vector& removed, const std::vector& added) { _compaction_strategy_impl->notify_completion(removed, added); } @@ -1012,6 +1000,11 @@ compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() { return _compaction_strategy_impl->get_backlog_tracker(); } +sstables::compaction_descriptor +compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) { + return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, iop, mode); +} + uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) { return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate); } diff --git a/sstables/compaction_strategy_impl.hh b/sstables/compaction_strategy_impl.hh index db6362ea79..f5c119e3c8 100644 --- a/sstables/compaction_strategy_impl.hh +++ b/sstables/compaction_strategy_impl.hh @@ -73,7 +73,6 @@ public: virtual ~compaction_strategy_impl() {} virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector candidates) = 0; virtual compaction_descriptor get_major_compaction_job(column_family& cf, std::vector candidates); - virtual std::vector get_resharding_jobs(column_family& cf, std::vector candidates); virtual void notify_completion(const std::vector& removed, const std::vector& added) { } virtual compaction_strategy_type type() const = 0; virtual bool parallel_compaction() const { @@ -103,5 +102,7 @@ public: virtual bool use_interposer_consumer() const { return false; } + + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode); }; } diff --git a/sstables/leveled_compaction_strategy.cc b/sstables/leveled_compaction_strategy.cc index 95769184bb..b562dc28ab 100644 --- a/sstables/leveled_compaction_strategy.cc +++ b/sstables/leveled_compaction_strategy.cc @@ -76,39 +76,6 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(colu sst->get_sstable_level(), _max_sstable_size_in_mb*1024*1024); } -std::vector leveled_compaction_strategy::get_resharding_jobs(column_family& cf, std::vector candidates) { - leveled_manifest manifest = leveled_manifest::create(cf, candidates, _max_sstable_size_in_mb, _stcs_options); - - std::vector descriptors; - shard_id target_shard = 0; - auto get_shard = [&target_shard] { return target_shard++ % smp::count; }; - - // Basically, we'll iterate through all levels, and for each, we'll sort the - // sstables by first key because there's a need to reshard together adjacent - // sstables. - // The shard at which the job will run is chosen in a round-robin fashion. - for (auto level = 0U; level <= manifest.get_level_count(); level++) { - uint64_t max_sstable_size = !level ? std::numeric_limits::max() : (_max_sstable_size_in_mb*1024*1024); - auto& sstables = manifest.get_level(level); - boost::sort(sstables, [] (auto& i, auto& j) { - return i->compare_by_first_key(*j) < 0; - }); - - resharding_descriptor current_descriptor = resharding_descriptor{{}, max_sstable_size, get_shard(), level}; - - for (auto it = sstables.begin(); it != sstables.end(); it++) { - current_descriptor.sstables.push_back(*it); - - auto next = std::next(it); - if (current_descriptor.sstables.size() == smp::count || next == sstables.end()) { - descriptors.push_back(std::move(current_descriptor)); - current_descriptor = resharding_descriptor{{}, max_sstable_size, get_shard(), level}; - } - } - } - return descriptors; -} - void leveled_compaction_strategy::notify_completion(const std::vector& removed, const std::vector& added) { if (removed.empty() || added.empty()) { return; @@ -171,4 +138,69 @@ int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family return manifest.get_estimated_tasks(); } +compaction_descriptor +leveled_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) { + std::array, leveled_manifest::MAX_LEVELS> level_info; + + auto is_disjoint = [this, schema] (const std::vector& sstables, unsigned tolerance) { + unsigned disjoint_sstables = 0; + auto prev_last = dht::ring_position::min(); + for (auto& sst : sstables) { + if (dht::ring_position(sst->get_first_decorated_key()).less_compare(*schema, prev_last)) { + disjoint_sstables++; + } + prev_last = dht::ring_position(sst->get_last_decorated_key()); + } + return disjoint_sstables > tolerance; + }; + + for (auto& sst : input) { + auto sst_level = sst->get_sstable_level(); + if (sst_level > leveled_manifest::MAX_LEVELS) { + leveled_manifest::logger.warn("Found SSTable with level {}, higher than the maximum {}. This is unexpected, but will fix", sst_level, leveled_manifest::MAX_LEVELS); + + // This is really unexpected, so we'll just compact it all to fix it + compaction_descriptor desc(std::move(input), std::optional(), iop, leveled_manifest::MAX_LEVELS - 1, _max_sstable_size_in_mb * 1024 * 1024); + desc.options = compaction_options::make_reshape(); + return desc; + } + level_info[sst_level].push_back(sst); + } + + for (auto& level : level_info) { + std::sort(level.begin(), level.end(), [this, schema] (shared_sstable a, shared_sstable b) { + return dht::ring_position(a->get_first_decorated_key()).less_compare(*schema, dht::ring_position(b->get_first_decorated_key())); + }); + } + + unsigned max_filled_level = 0; + + size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4); + size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold)); + unsigned tolerance = mode == reshape_mode::strict ? 0 : leveled_manifest::leveled_fan_out * 2; + + if (level_info[0].size() > offstrategy_threshold) { + level_info[0].resize(std::min(level_info[0].size(), max_sstables)); + compaction_descriptor desc(std::move(level_info[0]), std::optional(), iop); + desc.options = compaction_options::make_reshape(); + return desc; + } + + for (unsigned level = 1; level < leveled_manifest::MAX_LEVELS; ++level) { + if (level_info[level].empty()) { + continue; + } + max_filled_level = std::max(max_filled_level, level); + + if (!is_disjoint(level_info[level], tolerance)) { + // Unfortunately no good limit to limit input size to max_sstables for LCS major + compaction_descriptor desc(std::move(input), std::optional(), iop, max_filled_level, _max_sstable_size_in_mb * 1024 * 1024); + desc.options = compaction_options::make_reshape(); + return desc; + } + } + + return compaction_descriptor(); +} + } diff --git a/sstables/leveled_compaction_strategy.hh b/sstables/leveled_compaction_strategy.hh index 53dcd47df5..82656667f9 100644 --- a/sstables/leveled_compaction_strategy.hh +++ b/sstables/leveled_compaction_strategy.hh @@ -41,8 +41,6 @@ public: virtual compaction_descriptor get_major_compaction_job(column_family& cf, std::vector candidates) override; - virtual std::vector get_resharding_jobs(column_family& cf, std::vector candidates) override; - virtual void notify_completion(const std::vector& removed, const std::vector& added) override; // for each level > 0, get newest sstable and use its last key as last @@ -63,6 +61,8 @@ public: virtual compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } + + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override; }; } diff --git a/sstables/size_tiered_compaction_strategy.cc b/sstables/size_tiered_compaction_strategy.cc index 62e82b58b3..190b9d2f3f 100644 --- a/sstables/size_tiered_compaction_strategy.cc +++ b/sstables/size_tiered_compaction_strategy.cc @@ -208,4 +208,25 @@ size_tiered_compaction_strategy::most_interesting_bucket(const std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) +{ + size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4); + size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold)); + + if (mode == reshape_mode::relaxed) { + offstrategy_threshold = max_sstables; + } + + for (auto& bucket : get_buckets(input)) { + if (bucket.size() >= offstrategy_threshold) { + bucket.resize(std::min(max_sstables, bucket.size())); + compaction_descriptor desc(std::move(bucket), std::optional(), iop); + desc.options = compaction_options::make_reshape(); + } + } + + return compaction_descriptor(); +} + } diff --git a/sstables/size_tiered_compaction_strategy.hh b/sstables/size_tiered_compaction_strategy.hh index e6f23ae83b..5ca079e75f 100644 --- a/sstables/size_tiered_compaction_strategy.hh +++ b/sstables/size_tiered_compaction_strategy.hh @@ -168,6 +168,9 @@ public: virtual compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } + + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override; + }; } diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 1589799832..4422737cf7 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -26,6 +26,7 @@ #include "log.hh" #include "sstable_directory.hh" #include "lister.hh" +#include "database.hh" static logging::logger dirlog("sstable_directory"); @@ -187,7 +188,7 @@ sstable_directory::process_sstable_dir(const ::io_priority_class& iop) { return std::max(a, b); }); - dirlog.debug("{} After {} scanned, seen generation {}. {} descriptors found, {} different files found ", + dirlog.debug("After {} scanned, seen generation {}. {} descriptors found, {} different files found ", _sstable_dir, _max_generation_seen, state.descriptors.size(), state.generations_found.size()); // _descriptors is everything with a TOC. So after we remove this, what's left is @@ -232,7 +233,7 @@ sstable_directory::move_foreign_sstables(sharded& source_dire } // Should be empty, since an SSTable that belongs to this shard is not remote. assert(shard_id != this_shard_id()); - dirlog.debug("{} Moving {} unshared SSTables to shard {} ", info_vec.size(), shard_id); + dirlog.debug("Moving {} unshared SSTables to shard {} ", info_vec.size(), shard_id); return source_directory.invoke_on(shard_id, &sstables::sstable_directory::load_foreign_sstables, std::move(info_vec)); }); } @@ -278,6 +279,83 @@ sstable_directory::collect_output_sstables_from_resharding(std::vector +sstable_directory::remove_input_sstables_from_reshaping(std::vector sstlist) { + // When removing input sstables from reshaping: Those SSTables used to be in the unshared local + // list. So not only do we have to remove them, we also have to update the list. Because we're + // dealing with a vector it's easier to just reconstruct the list. + dirlog.debug("Removing {} reshaped SSTables", sstlist.size()); + return do_with(std::move(sstlist), std::unordered_set(), + [this] (std::vector& sstlist, std::unordered_set& exclude) { + + for (auto& sst : sstlist) { + exclude.insert(sst); + } + + auto old = std::exchange(_unshared_local_sstables, {}); + + for (auto& sst : old) { + if (!exclude.count(sst)) { + _unshared_local_sstables.push_back(sst); + } + } + + // Do this last for exception safety. If there is an exception on unlink we + // want to at least leave the SSTable unshared list in a sane state. + return parallel_for_each(std::move(sstlist), [] (sstables::shared_sstable sst) { + return sst->unlink(); + }).then([] { + fmt::print("Finished removing all SSTables\n"); + }); + }); +} + + +future<> +sstable_directory::collect_output_sstables_from_reshaping(std::vector reshaped_sstables) { + dirlog.debug("Collecting {} reshaped SSTables", reshaped_sstables.size()); + return parallel_for_each(std::move(reshaped_sstables), [this] (sstables::shared_sstable sst) { + _unshared_local_sstables.push_back(std::move(sst)); + return make_ready_future<>(); + }); +} + +future sstable_directory::reshape(compaction_manager& cm, table& table, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop, sstables::reshape_mode mode) +{ + return do_with(uint64_t(0), [this, &cm, &table, creator = std::move(creator), iop, mode] (uint64_t & reshaped_size) mutable { + return repeat([this, &cm, &table, creator = std::move(creator), iop, &reshaped_size, mode] () mutable { + auto desc = table.get_compaction_strategy().get_reshaping_job(_unshared_local_sstables, table.schema(), iop, mode); + if (desc.sstables.empty()) { + return make_ready_future(stop_iteration::yes); + } + + if (!reshaped_size) { + dirlog.info("Found SSTables that need reshape. Starting reshape process"); + } + + std::vector sstlist; + for (auto& sst : desc.sstables) { + reshaped_size += sst->data_size(); + sstlist.push_back(sst); + } + + desc.creator = creator; + + return cm.run_custom_job(&table, "reshape", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] () mutable { + return sstables::compact_sstables(std::move(desc), table).then([this, sstlist = std::move(sstlist)] (sstables::compaction_info result) mutable { + return remove_input_sstables_from_reshaping(std::move(sstlist)).then([this, new_sstables = std::move(result.new_sstables)] () mutable { + return collect_output_sstables_from_reshaping(std::move(new_sstables)); + }); + }); + }).then([] { + return make_ready_future(stop_iteration::no); + }); + }).then([&reshaped_size] { + return make_ready_future(reshaped_size); + }); + }); +} + future<> sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager& cm, table& table, unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop) @@ -309,7 +387,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager& // parallel_for_each so the statistics about pending jobs are updated to reflect all // jobs. But only one will run in parallel at a time return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector& sstlist) mutable { - return cm.run_resharding_job(&table, [this, iop, &cm, &table, creator, &sstlist] () { + return cm.run_custom_job(&table, "resharding", [this, iop, &cm, &table, creator, &sstlist] () { sstables::compaction_descriptor desc(sstlist, {}, iop); desc.options = sstables::compaction_options::make_reshard(); desc.creator = std::move(creator); diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 829d155364..a2f33d7493 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -101,7 +101,7 @@ private: // SSTables that are unshared and belong to this shard. They are already stored as an // SSTable object. - utils::chunked_vector _unshared_local_sstables; + std::vector _unshared_local_sstables; // SSTables that are unshared and belong to foreign shards. Because they are more conveniently // stored as a foreign_sstable_open_info object, they are in a different attribute separate from the @@ -122,6 +122,9 @@ private: future<> remove_input_sstables_from_resharding(const std::vector& sstlist); future<> collect_output_sstables_from_resharding(std::vector resharded_sstables); + future<> remove_input_sstables_from_reshaping(std::vector sstlist); + future<> collect_output_sstables_from_reshaping(std::vector reshaped_sstables); + template future<> parallel_for_each_restricted(Container&& C, Func&& func); future<> load_foreign_sstables(sstable_info_vector info_vec); @@ -174,6 +177,12 @@ public: unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop); + // reshapes a collection of SSTables, and returns the total amount of bytes reshaped. + future reshape(compaction_manager& cm, table& table, + sstables::compaction_sstable_creator_fn creator, + const ::io_priority_class& iop, + sstables::reshape_mode mode); + // Store a phased operation. Usually used to keep an object alive while the directory is being // processed. One example is preventing table drops concurrent to the processing of this // directory. diff --git a/sstables/time_window_compaction_strategy.cc b/sstables/time_window_compaction_strategy.cc index b224d38e1f..d7b8b4ee56 100644 --- a/sstables/time_window_compaction_strategy.cc +++ b/sstables/time_window_compaction_strategy.cc @@ -80,4 +80,49 @@ reader_consumer time_window_compaction_strategy::make_interposer_consumer(const }; } +compaction_descriptor +time_window_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) { + std::vector single_window; + std::vector multi_window; + + size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4); + size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold)); + + if (mode == reshape_mode::relaxed) { + offstrategy_threshold = max_sstables; + } + + for (auto& sst : input) { + auto min = sst->get_stats_metadata().min_timestamp; + auto max = sst->get_stats_metadata().max_timestamp; + if (get_window_for(_options, min) != get_window_for(_options, max)) { + multi_window.push_back(sst); + } else { + single_window.push_back(sst); + } + } + + if (!multi_window.empty()) { + // Everything that spans multiple windows will need reshaping + multi_window.resize(std::min(multi_window.size(), max_sstables)); + compaction_descriptor desc(std::move(multi_window), std::optional(), iop); + desc.options = compaction_options::make_reshape(); + return desc; + } + + // For things that don't span multiple windows, we compact windows that are individually too big + auto all_buckets = get_buckets(single_window, _options); + for (auto& pair : all_buckets.first) { + auto ssts = std::move(pair.second); + if (ssts.size() > offstrategy_threshold) { + ssts.resize(std::min(multi_window.size(), max_sstables)); + compaction_descriptor desc(std::move(ssts), std::optional(), iop); + desc.options = compaction_options::make_reshape(); + return desc; + } + } + + return compaction_descriptor(); +} + } diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh index d07e44dee2..31d2852212 100644 --- a/sstables/time_window_compaction_strategy.hh +++ b/sstables/time_window_compaction_strategy.hh @@ -351,6 +351,8 @@ public: virtual bool use_interposer_consumer() const override { return true; } + + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override; }; } diff --git a/table.cc b/table.cc index 933ec4c8c4..87222e72fa 100644 --- a/table.cc +++ b/table.cc @@ -825,49 +825,47 @@ table::seal_active_streaming_memtable_immediate(flush_permit&& permit) { auto guard = _streaming_flush_phaser.start(); return with_gate(_streaming_flush_gate, [this, old, permit = std::move(permit)] () mutable { - return with_lock(_sstables_lock.for_read(), [this, old, permit = std::move(permit)] () mutable { - auto newtab = make_sstable(); + auto newtab = make_sstable(); - tlogger.debug("Flushing to {}", newtab->get_filename()); + tlogger.debug("Flushing to {}", newtab->get_filename()); - // This is somewhat similar to the main memtable flush, but with important differences. - // - // The first difference, is that we don't keep aggregate collectd statistics about this one. - // If we ever need to, we'll keep them separate statistics, but we don't want to polute the - // main stats about memtables with streaming memtables. - // - // Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the - // memtable list, since this memtable was not available for reading up until this point. - auto fp = permit.release_sstable_write_permit(); - database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); - return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable { - auto&& priority = service::get_local_streaming_write_priority(); - sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); - cfg.backup = incremental_backups_enabled(); - return write_memtable_to_sstable(*old, newtab, monitor, cfg, priority).then([this, newtab, old] { - return newtab->open_data(); - }).then([this, old, newtab] () { - return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, newtab, old] { - auto adder = [this, newtab] { - add_sstable(newtab, {this_shard_id()}); - try_trigger_compaction(); - tlogger.debug("Flushing to {} done", newtab->get_filename()); - }; - if (cache_enabled()) { - return _cache.update_invalidating(adder, *old); - } else { - return _cache.invalidate(adder).then([old] { return old->clear_gently(); }); - } - }); - }).handle_exception([old, permit = std::move(permit), newtab] (auto ep) { - newtab->mark_for_deletion(); - tlogger.error("failed to write streamed sstable: {}", ep); - return make_exception_future<>(ep); + // This is somewhat similar to the main memtable flush, but with important differences. + // + // The first difference, is that we don't keep aggregate collectd statistics about this one. + // If we ever need to, we'll keep them separate statistics, but we don't want to polute the + // main stats about memtables with streaming memtables. + // + // Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the + // memtable list, since this memtable was not available for reading up until this point. + auto fp = permit.release_sstable_write_permit(); + database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); + return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable { + auto&& priority = service::get_local_streaming_write_priority(); + sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); + cfg.backup = incremental_backups_enabled(); + return write_memtable_to_sstable(*old, newtab, monitor, cfg, priority).then([this, newtab, old] { + return newtab->open_data(); + }).then([this, old, newtab] () { + return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, newtab, old] { + auto adder = [this, newtab] { + add_sstable(newtab, {this_shard_id()}); + try_trigger_compaction(); + tlogger.debug("Flushing to {} done", newtab->get_filename()); + }; + if (cache_enabled()) { + return _cache.update_invalidating(adder, *old); + } else { + return _cache.invalidate(adder).then([old] { return old->clear_gently(); }); + } }); + }).handle_exception([old, permit = std::move(permit), newtab] (auto ep) { + newtab->mark_for_deletion(); + tlogger.error("failed to write streamed sstable: {}", ep); + return make_exception_future<>(ep); }); - // We will also not have any retry logic. If we fail here, we'll fail the streaming and let - // the upper layers know. They can then apply any logic they want here. }); + // We will also not have any retry logic. If we fail here, we'll fail the streaming and let + // the upper layers know. They can then apply any logic they want here. }).finally([guard = std::move(guard)] { }); }); } @@ -882,27 +880,25 @@ future<> table::seal_active_streaming_memtable_big(streaming_memtable_big& smb, smb.memtables->erase(old); return with_gate(_streaming_flush_gate, [this, old, &smb, permit = std::move(permit)] () mutable { return with_gate(smb.flush_in_progress, [this, old, &smb, permit = std::move(permit)] () mutable { - return with_lock(_sstables_lock.for_read(), [this, old, &smb, permit = std::move(permit)] () mutable { - auto newtab = make_sstable(); + auto newtab = make_sstable(); - auto fp = permit.release_sstable_write_permit(); - auto monitor = std::make_unique(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); - auto&& priority = service::get_local_streaming_write_priority(); - sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); - cfg.backup = incremental_backups_enabled(); - cfg.leave_unsealed = true; - auto fut = write_memtable_to_sstable(*old, newtab, *monitor, cfg, priority); - return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable { - if (!f.failed()) { - smb.sstables.push_back(monitored_sstable{std::move(monitor), newtab}); - return make_ready_future<>(); - } else { - newtab->mark_for_deletion(); - auto ep = f.get_exception(); - tlogger.error("failed to write streamed sstable: {}", ep); - return make_exception_future<>(ep); - } - }); + auto fp = permit.release_sstable_write_permit(); + auto monitor = std::make_unique(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); + auto&& priority = service::get_local_streaming_write_priority(); + sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); + cfg.backup = incremental_backups_enabled(); + cfg.leave_unsealed = true; + auto fut = write_memtable_to_sstable(*old, newtab, *monitor, cfg, priority); + return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable { + if (!f.failed()) { + smb.sstables.push_back(monitored_sstable{std::move(monitor), newtab}); + return make_ready_future<>(); + } else { + newtab->mark_for_deletion(); + auto ep = f.get_exception(); + tlogger.error("failed to write streamed sstable: {}", ep); + return make_exception_future<>(ep); + } }); }); }); @@ -937,9 +933,7 @@ table::seal_active_memtable(flush_permit&& permit) { return do_with(std::move(permit), [this, old] (auto& permit) { return repeat([this, old, &permit] () mutable { auto sstable_write_permit = permit.release_sstable_write_permit(); - return with_lock(_sstables_lock.for_read(), [this, old, sstable_write_permit = std::move(sstable_write_permit)] () mutable { - return this->try_flush_memtable_to_sstable(old, std::move(sstable_write_permit)); - }).then([this, &permit] (auto should_stop) mutable { + return this->try_flush_memtable_to_sstable(old, std::move(sstable_write_permit)).then([this, &permit] (auto should_stop) mutable { if (should_stop) { return make_ready_future(should_stop); } @@ -1285,22 +1279,20 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) { return make_ready_future<>(); } - return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor)] () mutable { - descriptor.creator = [this] (shard_id dummy) { - auto sst = make_sstable(); - return sst; - }; - descriptor.replacer = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) { - _compaction_strategy.notify_completion(desc.old_sstables, desc.new_sstables); - _compaction_manager.propagate_replacement(this, desc.old_sstables, desc.new_sstables); - this->on_compaction_completion(desc); - if (release_exhausted) { - release_exhausted(desc.old_sstables); - } - }; + descriptor.creator = [this] (shard_id dummy) { + auto sst = make_sstable(); + return sst; + }; + descriptor.replacer = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) { + _compaction_strategy.notify_completion(desc.old_sstables, desc.new_sstables); + _compaction_manager.propagate_replacement(this, desc.old_sstables, desc.new_sstables); + this->on_compaction_completion(desc); + if (release_exhausted) { + release_exhausted(desc.old_sstables); + } + }; - return sstables::compact_sstables(std::move(descriptor), *this); - }).then([this] (auto info) { + return sstables::compact_sstables(std::move(descriptor), *this).then([this] (auto info) { if (info.type != sstables::compaction_type::Compaction) { return make_ready_future<>(); } @@ -1897,79 +1889,50 @@ future<> table::clear() { future table::discard_sstables(db_clock::time_point truncated_at) { assert(_compaction_disabled > 0); - return with_lock(_sstables_lock.for_read(), [this, truncated_at] { - struct pruner { - column_family& cf; - db::replay_position rp; - std::vector remove; + struct pruner { + column_family& cf; + db::replay_position rp; + std::vector remove; - pruner(column_family& cf) - : cf(cf) {} + pruner(column_family& cf) + : cf(cf) {} - void prune(db_clock::time_point truncated_at) { - auto gc_trunc = to_gc_clock(truncated_at); + void prune(db_clock::time_point truncated_at) { + auto gc_trunc = to_gc_clock(truncated_at); - auto pruned = make_lw_shared(cf._compaction_strategy.make_sstable_set(cf._schema)); + auto pruned = make_lw_shared(cf._compaction_strategy.make_sstable_set(cf._schema)); - for (auto& p : *cf._sstables->all()) { - if (p->max_data_age() <= gc_trunc) { - // Only one shard that own the sstable will submit it for deletion to avoid race condition in delete procedure. - if (*boost::min_element(p->get_shards_for_this_sstable()) == this_shard_id()) { - rp = std::max(p->get_stats_metadata().position, rp); - remove.emplace_back(p); - } - continue; + for (auto& p : *cf._sstables->all()) { + if (p->max_data_age() <= gc_trunc) { + // Only one shard that own the sstable will submit it for deletion to avoid race condition in delete procedure. + if (*boost::min_element(p->get_shards_for_this_sstable()) == this_shard_id()) { + rp = std::max(p->get_stats_metadata().position, rp); + remove.emplace_back(p); } - pruned->insert(p); + continue; } - - cf._sstables = std::move(pruned); + pruned->insert(p); } - }; - auto p = make_lw_shared(*this); - return _cache.invalidate([p, truncated_at] { - p->prune(truncated_at); - tlogger.debug("cleaning out row cache"); - }).then([this, p]() mutable { - rebuild_statistics(); - return parallel_for_each(p->remove, [this](sstables::shared_sstable s) { - remove_sstable_from_backlog_tracker(_compaction_strategy.get_backlog_tracker(), s); - return sstables::delete_atomically({s}); - }).then([p] { - return make_ready_future(p->rp); - }); + cf._sstables = std::move(pruned); + } + }; + auto p = make_lw_shared(*this); + return _cache.invalidate([p, truncated_at] { + p->prune(truncated_at); + tlogger.debug("cleaning out row cache"); + }).then([this, p]() mutable { + rebuild_statistics(); + + return parallel_for_each(p->remove, [this](sstables::shared_sstable s) { + remove_sstable_from_backlog_tracker(_compaction_strategy.get_backlog_tracker(), s); + return sstables::delete_atomically({s}); + }).then([p] { + return make_ready_future(p->rp); }); }); } -future -table::disable_sstable_write() { - _sstable_writes_disabled_at = std::chrono::steady_clock::now(); - return _sstables_lock.write_lock().then([this] { - // _sstable_deletion_sem must be acquired after _sstables_lock.write_lock - return _sstable_deletion_sem.wait().then([this] { - if (_sstables->all()->empty()) { - return make_ready_future(0); - } - int64_t max = 0; - for (auto&& s : *_sstables->all()) { - max = std::max(max, s->generation()); - } - return make_ready_future(max); - }); - }); -} - -std::chrono::steady_clock::duration table::enable_sstable_write(int64_t new_generation) { - if (new_generation != -1) { - update_sstables_known_generation(new_generation); - } - _sstable_deletion_sem.signal(); - _sstables_lock.write_unlock(); - return std::chrono::steady_clock::now() - _sstable_writes_disabled_at; -} - void table::set_schema(schema_ptr s) { assert(s->is_counter() == _schema->is_counter()); tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}", diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 5cfc3c5312..caf1c749a7 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -169,7 +169,7 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) { // Create incomplete sstables in test data directory sstring ks = "system"; - sstring cf = "local-7ad54392bcdd35a684174e047860b377"; + sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f"; sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string(); auto require_exist = [] (const sstring& name, bool should_exist) { @@ -223,7 +223,7 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) { // Create incomplete sstables in test data directory sstring ks = "system"; - sstring cf = "local-7ad54392bcdd35a684174e047860b377"; + sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f"; sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string(); sstring pending_delete_dir = sst_dir + "/" + sst::pending_delete_dir_basename(); diff --git a/test/boost/sstable_resharding_test.cc b/test/boost/sstable_resharding_test.cc index 6bf1626c90..6809093335 100644 --- a/test/boost/sstable_resharding_test.cc +++ b/test/boost/sstable_resharding_test.cc @@ -133,43 +133,6 @@ SEASTAR_TEST_CASE(sstable_resharding_test) { }); } -SEASTAR_THREAD_TEST_CASE(sstable_resharding_strategy_tests) { - test_env env; - - for (const auto version : all_sstable_versions) { - auto s = make_lw_shared(schema({}, "ks", "cf", {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); - auto get_sstable = [&] (int64_t gen, sstring first_key, sstring last_key) mutable { - auto sst = env.make_sstable(s, "", gen, version, sstables::sstable::format_types::big); - stats_metadata stats = {}; - stats.sstable_level = 1; - sstables::test(sst).set_values(std::move(first_key), std::move(last_key), std::move(stats)); - return sst; - }; - - column_family_for_tests cf; - - auto tokens = token_generation_for_current_shard(2); - auto stcs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options()); - auto lcs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); - - auto sst1 = get_sstable(1, tokens[0].first, tokens[1].first); - auto sst2 = get_sstable(2, tokens[1].first, tokens[1].first); - - { - // TODO: sstable_test runs with smp::count == 1, thus we will not be able to stress it more - // until we move this test case to sstable_resharding_test. - auto descriptors = stcs.get_resharding_jobs(*cf, { sst1, sst2 }); - BOOST_REQUIRE(descriptors.size() == 2); - } - { - auto ssts = std::vector{ sst1, sst2 }; - auto descriptors = lcs.get_resharding_jobs(*cf, ssts); - auto expected_jobs = (ssts.size()+smp::count-1)/smp::count; - BOOST_REQUIRE(descriptors.size() == expected_jobs); - } - } -} - SEASTAR_TEST_CASE(sstable_is_shared_correctness) { return test_env::do_with_async([] (test_env& env) { for (const auto version : all_sstable_versions) {