diff --git a/configure.py b/configure.py index d98bdda000..daeedfd80c 100755 --- a/configure.py +++ b/configure.py @@ -365,6 +365,7 @@ scylla_tests = set([ 'test/boost/schema_changes_test', 'test/boost/sstable_conforms_to_mutation_source_test', 'test/boost/sstable_resharding_test', + 'test/boost/sstable_directory_test', 'test/boost/sstable_test', 'test/boost/storage_proxy_test', 'test/boost/top_k_test', diff --git a/distributed_loader.cc b/distributed_loader.cc index 9051258f20..1813b947b9 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -239,6 +239,147 @@ future<> distributed_loader::verify_owner_and_mode(fs::path path) { }); }; +future<> +distributed_loader::process_sstable_dir(sharded& dir) { + return dir.invoke_on_all([&dir] (sstables::sstable_directory& d) { + // Supposed to be called with the node either down or on behalf of maintenance tasks + // like nodetool refresh + return d.process_sstable_dir(service::get_local_streaming_read_priority()).then([&dir, &d] { + return d.move_foreign_sstables(dir); + }); + }).then([&dir] { + return dir.invoke_on_all([&dir] (sstables::sstable_directory& d) { + return d.commit_directory_changes(); + }); + }); +} + +future<> +distributed_loader::lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name) { + return dir.invoke_on_all([&db, ks_name, cf_name] (sstables::sstable_directory& d) { + auto& table = db.local().find_column_family(ks_name, cf_name); + d.store_phaser(table.write_in_progress()); + return make_ready_future<>(); + }); +} + +// Helper structure for resharding. +// +// Describes the sstables (represented by their foreign_sstable_open_info) that are shared and +// need to be resharded. Each shard will keep one such descriptor, that contains the list of +// SSTables assigned to it, and their total size. The total size is used to make sure we are +// fairly balancing SSTables among shards. +struct reshard_shard_descriptor { + sstables::sstable_directory::sstable_info_vector info_vec; + uint64_t uncompressed_data_size = 0; + + bool total_size_smaller(const reshard_shard_descriptor& rhs) const { + return uncompressed_data_size < rhs.uncompressed_data_size; + } + + uint64_t size() const { + return uncompressed_data_size; + } +}; + +// Collects shared SSTables from all shards and returns a vector containing them all. +// This function assumes that the list of SSTables can be fairly big so it is careful to +// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators. +future +collect_all_shared_sstables(sharded& dir) { + return do_with(sstables::sstable_directory::sstable_info_vector(), [&dir] (sstables::sstable_directory::sstable_info_vector& info_vec) { + // We want to make sure that each distributed object reshards about the same amount of data. + // Each sharded object has its own shared SSTables. We can use a clever algorithm in which they + // all distributely figure out which SSTables to exchange, but we'll keep it simple and move all + // their foreign_sstable_open_info to a coordinator (the shard who called this function). We can + // move in bulk and that's efficient. That shard can then distribute the work among all the + // others who will reshard. + auto coordinator = this_shard_id(); + // We will first move all of the foreign open info to temporary storage so that we can sort + // them. We want to distribute bigger sstables first. + return dir.invoke_on_all([&info_vec, coordinator] (sstables::sstable_directory& d) { + return smp::submit_to(coordinator, [&info_vec, info = d.retrieve_shared_sstables()] () mutable { + // We want do_for_each here instead of a loop to avoid stalls. Resharding can be + // called during node operations too. For example, if it is called to load new + // SSTables into the system. + return do_for_each(info, [&info_vec] (sstables::foreign_sstable_open_info& info) { + info_vec.push_back(std::move(info)); + }); + }); + }).then([&info_vec] () mutable { + return make_ready_future(std::move(info_vec)); + }); + }); +} + +// Given a vector of shared sstables to be resharded, distribute it among all shards. +// The vector is first sorted to make sure that we are moving the biggest SSTables first. +// +// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do. +future> +distribute_reshard_jobs(sstables::sstable_directory::sstable_info_vector source) { + return do_with(std::move(source), std::vector(smp::count), + [] (sstables::sstable_directory::sstable_info_vector& source, std::vector& destinations) mutable { + std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) { + // Sort on descending SSTable sizes. + return a.uncompressed_data_size > b.uncompressed_data_size; + }); + return do_for_each(source, [&destinations] (sstables::foreign_sstable_open_info& info) mutable { + auto shard_it = boost::min_element(destinations, std::mem_fn(&reshard_shard_descriptor::total_size_smaller)); + shard_it->uncompressed_data_size += info.uncompressed_data_size; + shard_it->info_vec.push_back(std::move(info)); + }).then([&destinations] () mutable { + return make_ready_future>(std::move(destinations)); + }); + }); +} + +future<> run_resharding_jobs(sharded& dir, std::vector reshard_jobs, + sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) { + + uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&reshard_shard_descriptor::size)), uint64_t(0)); + if (total_size == 0) { + return make_ready_future<>(); + } + + return do_with(std::move(reshard_jobs), [&dir, &db, ks_name, table_name, creator = std::move(creator), total_size] (std::vector& reshard_jobs) { + auto total_size_mb = total_size / 1000000.0; + auto start = std::chrono::steady_clock::now(); + dblog.info("{}", fmt::format("Resharding {:.2f} MB", total_size_mb)); + + return dir.invoke_on_all([&dir, &db, &reshard_jobs, ks_name, table_name, creator] (sstables::sstable_directory& d) mutable { + auto& table = db.local().find_column_family(ks_name, table_name); + auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec); + auto& cm = table.get_compaction_manager(); + auto max_threshold = table.schema()->max_compaction_threshold(); + auto& iop = service::get_local_streaming_read_priority(); + return d.reshard(std::move(info_vec), cm, table, max_threshold, creator, iop).then([&d, &dir] { + return d.move_foreign_sstables(dir); + }); + }).then([start, total_size_mb] { + // add a microsecond to prevent division by zero + auto now = std::chrono::steady_clock::now() + 1us; + auto seconds = std::chrono::duration_cast>(now - start).count(); + dblog.info("{}", fmt::format("Resharded {:.2f} MB in {:.2f} seconds, {:.2f} MB/s", total_size_mb, seconds, (total_size_mb / seconds))); + return make_ready_future<>(); + }); + }); +} + +// Global resharding function. Done in two parts: +// - The first part spreads the foreign_sstable_open_info across shards so that all of them are +// resharding about the same amount of data +// - The second part calls each shard's distributed object to reshard the SSTables they were +// assigned. +future<> +distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) { + return collect_all_shared_sstables(dir).then([] (sstables::sstable_directory::sstable_info_vector all_jobs) mutable { + return distribute_reshard_jobs(std::move(all_jobs)); + }).then([&dir, &db, ks_name, table_name, creator = std::move(creator)] (std::vector destinations) mutable { + return run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, creator = std::move(creator)); + }); +} + // This function will iterate through upload directory in column family, // and will do the following for each sstable found: // 1) Mutate sstable level to 0. diff --git a/distributed_loader.hh b/distributed_loader.hh index fb1813d095..687605dc54 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -29,6 +29,7 @@ #include #include #include "seastarx.hh" +#include "sstables/compaction_descriptor.hh" class database; class table; @@ -44,6 +45,7 @@ namespace sstables { class entry_descriptor; class foreign_sstable_open_info; +class sstable_directory; } @@ -56,6 +58,10 @@ class migration_manager; class distributed_loader { public: + static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator); + static future<> process_sstable_dir(sharded& dir); + static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); + static void reshard(distributed& db, sstring ks_name, sstring cf_name); static future<> open_sstable(distributed& db, sstables::entry_descriptor comps, std::function (column_family&, sstables::foreign_sstable_open_info)> func, diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 1ce6940ba8..9585f73f1e 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -19,7 +19,15 @@ * along with Scylla. If not, see . */ +#include +#include "sstables/sstable_directory.hh" +#include "sstables/sstables.hh" +#include "sstables/compaction_manager.hh" +#include "log.hh" #include "sstable_directory.hh" +#include "lister.hh" + +static logging::logger dirlog("sstable_directory"); namespace sstables { @@ -32,4 +40,312 @@ bool manifest_json_filter(const fs::path&, const directory_entry& entry) { return true; } +sstable_directory::sstable_directory(fs::path sstable_dir, + unsigned load_parallelism, + need_mutate_level need_mutate_level, + lack_of_toc_fatal throw_on_missing_toc, + enable_dangerous_direct_import_of_cassandra_counters eddiocc, + allow_loading_materialized_view allow_mv, + sstable_object_from_existing_fn sstable_from_existing) + : _sstable_dir(std::move(sstable_dir)) + , _load_semaphore(load_parallelism) + , _need_mutate_level(need_mutate_level) + , _throw_on_missing_toc(throw_on_missing_toc) + , _enable_dangerous_direct_import_of_cassandra_counters(eddiocc) + , _allow_loading_materialized_view(allow_mv) + , _sstable_object_from_existing_sstable(std::move(sstable_from_existing)) + , _unshared_remote_sstables(smp::count) +{} + +void +sstable_directory::handle_component(scan_state& state, sstables::entry_descriptor desc, fs::path filename) { + // If not owned by us, skip + if ((desc.generation % smp::count) != this_shard_id()) { + return; + } + + dirlog.trace("for SSTable directory, scanning {}", filename); + state.generations_found.emplace(desc.generation, filename); + + switch (desc.component) { + case component_type::TemporaryStatistics: + // We generate TemporaryStatistics when we rewrite the Statistics file, + // for instance on mutate_level. We should delete it - so we mark it for deletion + // here, but just the component. The old statistics file should still be there + // and we'll go with it. + _files_for_removal.insert(filename.native()); + break; + case component_type::TOC: + state.descriptors.push_back(std::move(desc)); + break; + case component_type::TemporaryTOC: + state.temp_toc_found.push_back(std::move(desc)); + break; + default: + // Do nothing, and will validate when trying to load the file. + break; + } +} + +void sstable_directory::validate(sstables::shared_sstable sst) const { + schema_ptr s = sst->get_schema(); + if (s->is_counter() && !sst->has_scylla_component()) { + sstring error = "Direct loading non-Scylla SSTables containing counters is not supported."; + if (_enable_dangerous_direct_import_of_cassandra_counters) { + dirlog.info("{} But trying to continue on user's request.", error); + } else { + dirlog.error("{} Use sstableloader instead.", error); + throw std::runtime_error(fmt::format("{} Use sstableloader instead.", error)); + } + } + if (s->is_view() && !_allow_loading_materialized_view) { + throw std::runtime_error("Loading Materialized View SSTables is not supported. Re-create the view instead."); + } +} + +future<> +sstable_directory::process_descriptor(sstables::entry_descriptor desc, const ::io_priority_class& iop) { + if (sstables::is_later(desc.version, _max_version_seen)) { + _max_version_seen = desc.version; + } + + auto sst = _sstable_object_from_existing_sstable(_sstable_dir, desc.generation, desc.version, desc.format); + return sst->load(iop).then([this, sst] { + validate(sst); + if (_need_mutate_level) { + dirlog.trace("Mutating {} to level 0\n", sst->get_filename()); + return sst->mutate_sstable_level(0); + } else { + return make_ready_future<>(); + } + }).then([sst, this] { + return sst->get_open_info().then([sst, this] (sstables::foreign_sstable_open_info info) { + auto shards = sst->get_shards_for_this_sstable(); + if (shards.size() == 1) { + if (shards[0] == this_shard_id()) { + dirlog.trace("{} identified as a local unshared SSTable", sst->get_filename()); + _unshared_local_sstables.push_back(sst); + } else { + dirlog.trace("{} identified as a remote unshared SSTable", sst->get_filename()); + _unshared_remote_sstables[shards[0]].push_back(std::move(info)); + } + } else { + dirlog.trace("{} identified as a shared SSTable", sst->get_filename()); + _shared_sstable_info.push_back(std::move(info)); + } + return make_ready_future<>(); + }); + }); +} + +int64_t +sstable_directory::highest_generation_seen() const { + return _max_generation_seen; +} + +sstables::sstable_version_types +sstable_directory::highest_version_seen() const { + return _max_version_seen; +} + +future<> +sstable_directory::process_sstable_dir(const ::io_priority_class& iop) { + dirlog.debug("Start processing directory {} for SSTables", _sstable_dir); + + // It seems wasteful that each shard is repeating this scan, and to some extent it is. + // However, we still want to open the files and especially call process_dir() in a distributed + // fashion not to overload any shard. Also in the common case the SSTables will all be + // unshared and be on the right shard based on their generation number. In light of that there are + // two advantages of having each shard repeat the directory listing: + // + // - The directory listing part already interacts with data_structures inside scan_state. We + // would have to either transfer a lot of file information among shards or complicate the code + // to make sure they all update their own version of scan_state and then merge it. + // - If all shards scan in parallel, they can start loading sooner. That is faster than having + // a separate step to fetch all files, followed by another step to distribute and process. + return do_with(scan_state{}, [this, &iop] (scan_state& state) { + return lister::scan_dir(_sstable_dir, { directory_entry_type::regular }, + [this, &state] (fs::path parent_dir, directory_entry de) { + auto comps = sstables::entry_descriptor::make_descriptor(_sstable_dir.native(), de.name); + handle_component(state, std::move(comps), parent_dir / fs::path(de.name)); + return make_ready_future<>(); + }, &manifest_json_filter).then([this, &state, &iop] { + // Always okay to delete files with a temporary TOC. We want to do it before we process + // the generations seen: it's okay to reuse those generations since the files will have + // been deleted anyway. + for (auto& desc: state.temp_toc_found) { + auto range = state.generations_found.equal_range(desc.generation); + for (auto it = range.first; it != range.second; ++it) { + auto& path = it->second; + dirlog.trace("Scheduling to remove file {}, from an SSTable with a Temporary TOC", path.native()); + _files_for_removal.insert(path.native()); + } + state.generations_found.erase(range.first, range.second); + } + + _max_generation_seen = boost::accumulate(state.generations_found | boost::adaptors::map_keys, int64_t(0), [] (int64_t a, int64_t b) { + return std::max(a, b); + }); + + dirlog.debug("{} After {} scanned, seen generation {}. {} descriptors found, {} different files found ", + _sstable_dir, _max_generation_seen, state.descriptors.size(), state.generations_found.size()); + + // _descriptors is everything with a TOC. So after we remove this, what's left is + // SSTables for which a TOC was not found. + return parallel_for_each_restricted(state.descriptors, [this, &state, &iop] (sstables::entry_descriptor desc) { + state.generations_found.erase(desc.generation); + // This will try to pre-load this file and throw an exception if it is invalid + return process_descriptor(std::move(desc), iop); + }).then([this, &state] { + // For files missing TOC, it depends on where this is coming from. + // If scylla was supposed to have generated this SSTable, this is not okay and + // we refuse to proceed. If this coming from, say, an import, then we just delete, + // log and proceed. + for (auto& path : state.generations_found | boost::adaptors::map_values) { + if (_throw_on_missing_toc) { + throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable {}!. Refusing to boot", _sstable_dir.native(), path.native())); + } else { + dirlog.info("Found incomplete SSTable {} at directory {}. Removing", path.native(), _sstable_dir.native()); + _files_for_removal.insert(path.native()); + } + } + }); + }); + }); +} + +future<> +sstable_directory::commit_directory_changes() { + // Remove all files scheduled for removal + return parallel_for_each(std::exchange(_files_for_removal, {}), [] (sstring path) { + dirlog.info("Removing file {}", path); + return remove_file(std::move(path)); + }); +} + +future<> +sstable_directory::move_foreign_sstables(sharded& source_directory) { + return parallel_for_each(boost::irange(0u, smp::count), [this, &source_directory] (unsigned shard_id) mutable { + auto info_vec = std::exchange(_unshared_remote_sstables[shard_id], {}); + if (info_vec.empty()) { + return make_ready_future<>(); + } + // Should be empty, since an SSTable that belongs to this shard is not remote. + assert(shard_id != this_shard_id()); + dirlog.debug("{} Moving {} unshared SSTables to shard {} ", info_vec.size(), shard_id); + return source_directory.invoke_on(shard_id, &sstables::sstable_directory::load_foreign_sstables, std::move(info_vec)); + }); +} + +future<> +sstable_directory::load_foreign_sstables(sstable_info_vector info_vec) { + return parallel_for_each_restricted(info_vec, [this] (sstables::foreign_sstable_open_info& info) { + auto sst = _sstable_object_from_existing_sstable(_sstable_dir, info.generation, info.version, info.format); + return sst->load(std::move(info)).then([sst, this] { + _unshared_local_sstables.push_back(sst); + return make_ready_future<>(); + }); + }); +} + +future<> +sstable_directory::remove_input_sstables_from_resharding(const std::vector& sstlist) { + dirlog.debug("Removing {} resharded SSTables", sstlist.size()); + return parallel_for_each(sstlist, [] (sstables::shared_sstable sst) { + dirlog.trace("Removing resharded SSTable {}", sst->get_filename()); + return sst->unlink(); + }); +} + +future<> +sstable_directory::collect_output_sstables_from_resharding(std::vector resharded_sstables) { + dirlog.debug("Collecting {} resharded SSTables", resharded_sstables.size()); + return parallel_for_each(std::move(resharded_sstables), [this] (sstables::shared_sstable sst) { + auto shards = sst->get_shards_for_this_sstable(); + assert(shards.size() == 1); + auto shard = shards[0]; + + if (shard == this_shard_id()) { + dirlog.trace("Collected resharded SSTable {} already local", sst->get_filename()); + _unshared_local_sstables.push_back(std::move(sst)); + return make_ready_future<>(); + } + dirlog.trace("Collected resharded SSTable {} is remote. Storing it", sst->get_filename()); + return sst->get_open_info().then([this, shard, sst] (sstables::foreign_sstable_open_info info) { + _unshared_remote_sstables[shard].push_back(std::move(info)); + return make_ready_future<>(); + }); + }); +} + +future<> +sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager& cm, table& table, + unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop) +{ + // Resharding doesn't like empty sstable sets, so bail early. There is nothing + // to reshard in this shard. + if (shared_info.empty()) { + return make_ready_future<>(); + } + + // We want to reshard many SSTables at a time for efficiency. However if we have to many we may + // be risking OOM. + auto num_jobs = shared_info.size() / max_sstables_per_job + 1; + auto sstables_per_job = shared_info.size() / num_jobs; + + using reshard_buckets = std::vector>; + return do_with(reshard_buckets(1), [this, &cm, &table, sstables_per_job, iop, num_jobs, creator = std::move(creator), shared_info = std::move(shared_info)] (reshard_buckets& buckets) mutable { + return parallel_for_each(shared_info, [this, sstables_per_job, num_jobs, &buckets] (sstables::foreign_sstable_open_info& info) { + auto sst = _sstable_object_from_existing_sstable(_sstable_dir, info.generation, info.version, info.format); + return sst->load(std::move(info)).then([this, &buckets, sstables_per_job, num_jobs, sst = std::move(sst)] () mutable { + // Last bucket gets leftover SSTables + if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) { + buckets.emplace_back(); + } + buckets.back().push_back(std::move(sst)); + }); + }).then([this, &cm, &table, &buckets, iop, creator = std::move(creator)] () mutable { + // There is a semaphore inside the compaction manager in run_resharding_jobs. So we + // parallel_for_each so the statistics about pending jobs are updated to reflect all + // jobs. But only one will run in parallel at a time + return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector& sstlist) mutable { + return cm.run_resharding_job(&table, [this, iop, &cm, &table, creator, &sstlist] () { + sstables::compaction_descriptor desc(sstlist, {}, iop); + desc.options = sstables::compaction_options::make_reshard(); + desc.creator = std::move(creator); + + return sstables::compact_sstables(std::move(desc), table).then([this, &sstlist] (sstables::compaction_info result) { + return when_all_succeed(collect_output_sstables_from_resharding(std::move(result.new_sstables)), remove_input_sstables_from_resharding(sstlist)); + }); + }); + }); + }); + }); +} + +future<> +sstable_directory::do_for_each_sstable(std::function(sstables::shared_sstable)> func) { + return parallel_for_each_restricted(_unshared_local_sstables, std::move(func)); +} + +template +future<> +sstable_directory::parallel_for_each_restricted(Container&& C, Func&& func) { + return parallel_for_each(C, [this, func = std::move(func)] (auto& el) mutable { + return with_semaphore(_load_semaphore, 1, [this, func, el = std::move(el)] () mutable { + return func(el); + }); + }); +} + +void +sstable_directory::store_phaser(utils::phased_barrier::operation op) { + _operation_barrier.emplace(std::move(op)); +} + +sstable_directory::sstable_info_vector +sstable_directory::retrieve_shared_sstables() { + return std::exchange(_shared_sstable_info, {}); +} + } diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index a5bbf625ec..829d155364 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -23,9 +23,169 @@ #include "lister.hh" #include +#include +#include +#include +#include +#include +#include +#include "seastarx.hh" +#include "sstables/shared_sstable.hh" // sstables::shared_sstable +#include "sstables/version.hh" // sstable versions +#include "sstables/compaction_descriptor.hh" // for compaction_sstable_creator_fn +#include "sstables/open_info.hh" // for entry_descriptor and foreign_sstable_open_info, chunked_vector wants to know if they are move constructible +#include "utils/chunked_vector.hh" +#include "utils/phased_barrier.hh" + +class compaction_manager; namespace sstables { bool manifest_json_filter(const std::filesystem::path&, const directory_entry& entry); +// Handles a directory containing SSTables. It could be an auxiliary directory (like upload), +// or the main directory. +class sstable_directory { +public: + using lack_of_toc_fatal = bool_class; + using need_mutate_level = bool_class; + using enable_dangerous_direct_import_of_cassandra_counters = bool_class; + using allow_loading_materialized_view = bool_class; + + using sstable_object_from_existing_fn = + noncopyable_function; + + // favor chunked vectors when dealing with file lists: they can grow to hundreds of thousands + // of elements. + using sstable_info_vector = utils::chunked_vector; +private: + using scan_multimap = std::unordered_multimap; + using scan_descriptors = utils::chunked_vector; + + struct scan_state { + scan_multimap generations_found; + scan_descriptors temp_toc_found; + scan_descriptors descriptors; + }; + + // SSTable files to be deleted: things with a Temporary TOC, missing TOC files, + // TemporaryStatistics, etc. Not part of the scan state, because we want to do a 2-phase + // delete: maybe one of the shards will have signaled an error. And in the case of an error + // we don't want to delete anything. + std::unordered_set _files_for_removal; + + // prevents an object that respects a phaser (usually a table) from disappearing in the middle of the operation. + // Will be destroyed when this object is destroyed. + std::optional _operation_barrier; + + std::filesystem::path _sstable_dir; + + // We may have hundreds of thousands of files to load. To protect against OOMs we will limit + // how many of them we process at the same time. + semaphore _load_semaphore; + + // Flags below control how to behave when scanning new SSTables. + need_mutate_level _need_mutate_level; + lack_of_toc_fatal _throw_on_missing_toc; + enable_dangerous_direct_import_of_cassandra_counters _enable_dangerous_direct_import_of_cassandra_counters; + allow_loading_materialized_view _allow_loading_materialized_view; + + // How to create an SSTable object from an existing SSTable file (respecting generation, etc) + sstable_object_from_existing_fn _sstable_object_from_existing_sstable; + + int64_t _max_generation_seen = 0; + sstables::sstable_version_types _max_version_seen = sstables::sstable_version_types::ka; + + // SSTables that are unshared and belong to this shard. They are already stored as an + // SSTable object. + utils::chunked_vector _unshared_local_sstables; + + // SSTables that are unshared and belong to foreign shards. Because they are more conveniently + // stored as a foreign_sstable_open_info object, they are in a different attribute separate from the + // local SSTables. + // + // The indexes of the outer vector represent the shards. Having anything in the index + // representing this shard is illegal. + std::vector _unshared_remote_sstables; + + // SSTables that are shared. Stored as foreign_sstable_open_info objects. Reason is those are + // the SSTables that we found, and not necessarily the ones we will reshard. We want to balance + // the amount of data resharded per shard, so a coordinator may redistribute this. + sstable_info_vector _shared_sstable_info; + + future<> process_descriptor(sstables::entry_descriptor desc, const ::io_priority_class& iop); + void validate(sstables::shared_sstable sst) const; + void handle_component(scan_state& state, sstables::entry_descriptor desc, std::filesystem::path filename); + future<> remove_input_sstables_from_resharding(const std::vector& sstlist); + future<> collect_output_sstables_from_resharding(std::vector resharded_sstables); + + template + future<> parallel_for_each_restricted(Container&& C, Func&& func); + future<> load_foreign_sstables(sstable_info_vector info_vec); +public: + sstable_directory(std::filesystem::path sstable_dir, + unsigned load_parallelism, + need_mutate_level need_mutate, + lack_of_toc_fatal fatal_nontoc, + enable_dangerous_direct_import_of_cassandra_counters eddiocc, + allow_loading_materialized_view, + sstable_object_from_existing_fn sstable_from_existing); + + // moves unshared SSTables that don't belong to this shard to the right shards. + future<> move_foreign_sstables(sharded& source_directory); + + // returns what is the highest generation seen in this directory. + int64_t highest_generation_seen() const; + + // returns what is the highest version seen in this directory. + sstables::sstable_version_types highest_version_seen() const; + + // scans a directory containing SSTables. Every generation that is believed to belong to this + // shard is processed, the ones that are not are skipped. Potential pertinence is decided as + // generation % smp::count. + // + // Once this method return, every SSTable that this shard processed can be in one of 3 states: + // - unshared, local: not a shared SSTable, and indeed belongs to this shard. + // - unshared, remote: not s shared SSTable, but belongs to a remote shard. + // - shared : shared SSTable that belongs to many shards. Must be resharded before using + // + // This function doesn't change on-storage state. If files are to be removed, a separate call + // (commit_file_removals()) has to be issued. This is to make sure that all instances of this + // class in a sharded service have the opportunity to validate its files. + future<> process_sstable_dir(const ::io_priority_class& iop); + + // If files were scheduled to be removed, they will be removed after this call. + future<> commit_directory_changes(); + + // reshards a collection of SSTables. + // + // A reference to the compaction manager must be passed so we can register with it. Knowing + // which table is being processed is a requirement of the compaction manager, so this must be + // passed too. + // + // We will reshard max_sstables_per_job at once. + // + // A creator function must be passed that will create an SSTable object in the correct shard, + // and an I/O priority must be specified. + future<> reshard(sstable_info_vector info, compaction_manager& cm, table& table, + unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator, + const ::io_priority_class& iop); + + // Store a phased operation. Usually used to keep an object alive while the directory is being + // processed. One example is preventing table drops concurrent to the processing of this + // directory. + void store_phaser(utils::phased_barrier::operation op); + + // Helper function that processes all unshared SSTables belonging to this shard, respecting the + // concurrency limit. + future<> do_for_each_sstable(std::function(sstables::shared_sstable)> func); + + // Retrieves the list of shared SSTables in this object. The list will be reset once this + // is called. + sstable_info_vector retrieve_shared_sstables(); +}; + } diff --git a/test/boost/sstable_directory_test.cc b/test/boost/sstable_directory_test.cc new file mode 100644 index 0000000000..b39455c84e --- /dev/null +++ b/test/boost/sstable_directory_test.cc @@ -0,0 +1,582 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + +#include +#include +#include +#include "sstables/sstable_directory.hh" +#include "distributed_loader.hh" +#include "test/lib/sstable_utils.hh" +#include "test/lib/cql_test_env.hh" +#include "test/lib/tmpdir.hh" +#include "db/config.hh" + +#include "fmt/format.h" + +schema_ptr test_table_schema() { + static thread_local auto s = [] { + schema_builder builder(make_lw_shared(schema( + generate_legacy_id("ks", "cf"), "ks", "cf", + // partition key + {{"p", bytes_type}}, + // clustering key + {}, + // regular columns + {{"c", int32_type}}, + // static columns + {}, + // regular column name type + bytes_type, + // comment + "" + ))); + return builder.build(schema_builder::compact_storage::no); + }(); + return s; +} + +using namespace sstables; + +future +make_sstable_for_this_shard(std::function sst_factory) { + auto s = test_table_schema(); + auto key_token_pair = token_generation_for_shard(1, this_shard_id(), 12); + auto key = partition_key::from_exploded(*s, {to_bytes(key_token_pair[0].first)}); + mutation m(s, key); + m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0)); + return make_ready_future(make_sstable_containing(sst_factory, {m})); +} + +/// Create a shared SSTable belonging to all shards for the following schema: "create table cf (p text PRIMARY KEY, c int)" +/// +/// Arguments passed to the function are passed to table::make_sstable +template +future +make_sstable_for_all_shards(database& db, table& table, fs::path sstdir, int64_t generation, Args&&... args) { + // Unlike the previous helper, we'll assume we're in a thread here. It's less flexible + // but the users are usually in a thread, and rewrite_toc_without_scylla_component requires + // a thread. We could fix that, but deferring that for now. + auto s = table.schema(); + auto mt = make_lw_shared(s); + auto msb = db.get_config().murmur3_partitioner_ignore_msb_bits(); + for (shard_id shard = 0; shard < smp::count; ++shard) { + auto key_token_pair = token_generation_for_shard(1, shard, msb); + auto key = partition_key::from_exploded(*s, {to_bytes(key_token_pair[0].first)}); + mutation m(s, key); + m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0)); + mt->apply(std::move(m)); + } + auto sst = table.make_sstable(sstdir.native(), generation++, args...); + write_memtable_to_sstable(*mt, sst, table.get_sstables_manager().configure_writer()).get(); + mt->clear_gently().get(); + // We can't write an SSTable with bad sharding, so pretend + // it came from Cassandra + sstables::test(sst).remove_component(sstables::component_type::Scylla).get(); + sstables::test(sst).rewrite_toc_without_scylla_component(); + return make_ready_future(sst); +} + +sstables::shared_sstable sstable_from_existing_file(fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { + return test_sstables_manager.make_sstable(test_table_schema(), dir.native(), gen, v, f, gc_clock::now(), default_io_error_handler_gen(), default_sstable_buffer_size); +} + +template +sstables::shared_sstable new_sstable(fs::path dir, int64_t gen) { + return test_sstables_manager.make_sstable(test_table_schema(), dir.native(), gen, + sstables::sstable_version_types::mc, sstables::sstable_format_types::big, + gc_clock::now(), default_io_error_handler_gen(), default_sstable_buffer_size); +} + +// there is code for this in distributed_loader.cc but this is so simple it is not worth polluting +// the public namespace for it. Repeat it here. +inline future +highest_generation_seen(sharded& dir) { + return dir.map_reduce0(std::mem_fn(&sstable_directory::highest_generation_seen), int64_t(0), [] (int64_t a, int64_t b) { + return std::max(a, b); + }); +} + +SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) { + auto dir = tmpdir(); + + // Write a manifest file to make sure it's ignored + auto manifest = dir.path() / "manifest.json"; + auto f = open_file_dma(manifest.native(), open_flags::wo | open_flags::create | open_flags::truncate).get0(); + f.close().get(); + + sharded sstdir; + sstdir.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::no, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + distributed_loader::process_sstable_dir(sstdir).get(); + int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); + // No generation found on empty directory. + BOOST_REQUIRE_EQUAL(max_generation_seen, 0); +} + +// Test unrecoverable SSTable: missing a file that is expected in the TOC. +SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables) { + auto dir = tmpdir(); + auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0(); + + // Now there is one sstable to the upload directory, but it is incomplete and one component is missing. + // We should fail validation and leave the directory untouched + remove_file(sst->filename(sstables::component_type::Statistics)).get(); + + sharded sstdir; + sstdir.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::no, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir); + BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); +} + +// Test always-benign incomplete SSTable: temporaryTOC found +SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_temporary_toc) { + auto dir = tmpdir(); + auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0(); + rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get(); + + sharded sstdir; + sstdir.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::yes, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + auto expect_ok = distributed_loader::process_sstable_dir(sstdir); + BOOST_REQUIRE_NO_THROW(expect_ok.get()); +} + +// Test the absence of TOC. Behavior is controllable by a flag +SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_missing_toc) { + auto dir = tmpdir(); + + auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0(); + remove_file(sst->filename(sstables::component_type::TOC)).get(); + + sharded sstdir_fatal; + sstdir_fatal.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::yes, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop_fatal = defer([&sstdir_fatal] { + sstdir_fatal.stop().get(); + }); + + auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal); + BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); + + sharded sstdir_ok; + sstdir_ok.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::no, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop_ok = defer([&sstdir_ok] { + sstdir_ok.stop().get(); + }); + + auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok); + BOOST_REQUIRE_NO_THROW(expect_ok.get()); +} + +// Test the presence of TemporaryStatistics. If the old Statistics file is around +// this is benign and we'll just delete it and move on. If the old Statistics file +// is not around (but mentioned in the TOC), then this is an error. +SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) { + auto dir = tmpdir(); + + auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0(); + auto tempstr = sst->filename(dir.path().native(), component_type::TemporaryStatistics); + auto f = open_file_dma(tempstr, open_flags::rw | open_flags::create | open_flags::truncate).get0(); + f.close().get(); + auto tempstat = fs::canonical(fs::path(tempstr)); + + sharded sstdir_ok; + sstdir_ok.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::no, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop_ok= defer([&sstdir_ok] { + sstdir_ok.stop().get(); + }); + + auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok); + BOOST_REQUIRE_NO_THROW(expect_ok.get()); + lister::scan_dir(dir.path(), { directory_entry_type::regular }, [tempstat] (fs::path parent_dir, directory_entry de) { + BOOST_REQUIRE(fs::canonical(parent_dir / fs::path(de.name)) != tempstat); + return make_ready_future<>(); + }).get(); + + remove_file(sst->filename(sstables::component_type::Statistics)).get(); + + sharded sstdir_fatal; + sstdir_fatal.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::no, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop_fatal = defer([&sstdir_fatal] { + sstdir_fatal.stop().get(); + }); + + auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal); + BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); +} + +// Test that we see the right generation during the scan. Temporary files are skipped +SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) { + auto dir = tmpdir(); + make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 3333)).get0(); + auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 6666)).get0(); + rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get(); + + sharded sstdir; + sstdir.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::yes, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + distributed_loader::process_sstable_dir(sstdir).get(); + int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); + BOOST_REQUIRE_EQUAL(max_generation_seen, 3333); +} + +future<> verify_that_all_sstables_are_local(sharded& sstdir, unsigned expected_sstables) { + return do_with(std::make_unique>(0), [&sstdir, expected_sstables] (std::unique_ptr>& count) { + return sstdir.invoke_on_all([count = count.get()] (sstable_directory& d) { + return d.do_for_each_sstable([count] (sstables::shared_sstable sst) { + count->fetch_add(1, std::memory_order_relaxed); + auto shards = sst->get_shards_for_this_sstable(); + BOOST_REQUIRE_EQUAL(shards.size(), 1); + BOOST_REQUIRE_EQUAL(shards[0], this_shard_id()); + return make_ready_future<>(); + }); + }).then([count = count.get(), expected_sstables] { + BOOST_REQUIRE_EQUAL(count->load(std::memory_order_relaxed), expected_sstables); + return make_ready_future<>(); + }); + }); +} + +// Test that all SSTables are seen as unshared, if the generation numbers match what their +// shard-assignments expect +SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_generations) { + auto dir = tmpdir(); + for (shard_id i = 0; i < smp::count; ++i) { + smp::submit_to(i, [dir = dir.path(), i] { + // this is why it is annoying for the internal functions in the test infrastructure to + // assume threaded execution + return seastar::async([dir, i] { + make_sstable_for_this_shard(std::bind(new_sstable, dir, i)).get0(); + }); + }).get(); + } + + sharded sstdir; + sstdir.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::yes, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + distributed_loader::process_sstable_dir(sstdir).get(); + verify_that_all_sstables_are_local(sstdir, smp::count).get(); +} + +// Test that all SSTables are seen as unshared, even if the generation numbers do not match what their +// shard-assignments expect +SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_generations) { + auto dir = tmpdir(); + for (shard_id i = 0; i < smp::count; ++i) { + smp::submit_to(i, [dir = dir.path(), i] { + // this is why it is annoying for the internal functions in the test infrastructure to + // assume threaded execution + return seastar::async([dir, i] { + make_sstable_for_this_shard(std::bind(new_sstable, dir, i + 1)).get0(); + }); + }).get(); + } + + sharded sstdir; + sstdir.start(dir.path(), 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::yes, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + distributed_loader::process_sstable_dir(sstdir).get(); + verify_that_all_sstables_are_local(sstdir, smp::count).get(); +} + +// Test that the sstable_dir object can keep the table alive against a drop +SEASTAR_TEST_CASE(sstable_directory_test_table_lock_works) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); + auto ks_name = "ks"; + auto cf_name = "cf"; + auto& cf = e.local_db().find_column_family(ks_name, cf_name); + auto path = fs::path(cf.dir()); + + sharded sstdir; + sstdir.start(path, 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::no, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + &sstable_from_existing_file).get(); + + // stop cleanly in case we fail early for unexpected reasons + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + distributed_loader::lock_table(sstdir, e.db(), ks_name, cf_name).get(); + + auto drop = e.execute_cql("drop table cf"); + later().get(); + + auto table_ok = e.db().invoke_on_all([ks_name, cf_name] (database& db) { + db.find_column_family(ks_name, cf_name); + }); + BOOST_REQUIRE_NO_THROW(table_ok.get()); + + // Stop manually now, to allow for the object to be destroyed and take the + // phaser with it. + stop.cancel(); + sstdir.stop().get(); + drop.get(); + + auto no_such_table = e.db().invoke_on_all([ks_name, cf_name] (database& db) { + db.find_column_family(ks_name, cf_name); + return make_ready_future<>(); + }); + BOOST_REQUIRE_THROW(no_such_table.get(), no_such_column_family); + }); +} + +SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) { + if (smp::count == 1) { + fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n"); + return make_ready_future<>(); + } + + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); + auto& cf = e.local_db().find_column_family("ks", "cf"); + auto upload_path = fs::path(cf.dir()) / "upload"; + + e.db().invoke_on_all([] (database& db) { + auto& cf = db.find_column_family("ks", "cf"); + cf.disable_auto_compaction(); + }).get(); + + unsigned num_sstables = 10 * smp::count; + auto generation = 0; + for (unsigned nr = 0; nr < num_sstables; ++nr) { + make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get(); + } + + sharded sstdir; + sstdir.start(upload_path, 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::yes, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + [&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { + auto& cf = e.local_db().find_column_family("ks", "cf"); + return cf.make_sstable(dir.native(), gen, v, f); + }).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + distributed_loader::process_sstable_dir(sstdir).get(); + verify_that_all_sstables_are_local(sstdir, 0).get(); + + int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); + std::atomic generation_for_test = {}; + generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed); + + distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { + auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed); + auto& cf = e.local_db().find_column_family("ks", "cf"); + return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big); + }).get(); + verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get(); + }); +} + +SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_even_if_files_are_not_well_distributed) { + if (smp::count == 1) { + fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n"); + return make_ready_future<>(); + } + + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); + auto& cf = e.local_db().find_column_family("ks", "cf"); + auto upload_path = fs::path(cf.dir()) / "upload"; + + e.db().invoke_on_all([] (database& db) { + auto& cf = db.find_column_family("ks", "cf"); + cf.disable_auto_compaction(); + }).get(); + + unsigned num_sstables = 10 * smp::count; + auto generation = 0; + for (unsigned nr = 0; nr < num_sstables; ++nr) { + make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++ * smp::count, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get(); + } + + sharded sstdir; + sstdir.start(upload_path, 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::yes, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + [&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { + auto& cf = e.local_db().find_column_family("ks", "cf"); + return cf.make_sstable(dir.native(), gen, v, f); + }).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + distributed_loader::process_sstable_dir(sstdir).get(); + verify_that_all_sstables_are_local(sstdir, 0).get(); + + int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); + std::atomic generation_for_test = {}; + generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed); + + distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { + auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed); + auto& cf = e.local_db().find_column_family("ks", "cf"); + return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big); + }).get(); + verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get(); + }); +} + +SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshold) { + if (smp::count == 1) { + fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n"); + return make_ready_future<>(); + } + + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); + auto& cf = e.local_db().find_column_family("ks", "cf"); + auto upload_path = fs::path(cf.dir()) / "upload"; + + e.db().invoke_on_all([] (database& db) { + auto& cf = db.find_column_family("ks", "cf"); + cf.disable_auto_compaction(); + }).get(); + + unsigned num_sstables = (cf.schema()->max_compaction_threshold() + 1) * smp::count; + auto generation = 0; + for (unsigned nr = 0; nr < num_sstables; ++nr) { + make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get(); + } + + sharded sstdir; + sstdir.start(upload_path, 1, + sstable_directory::need_mutate_level::no, + sstable_directory::lack_of_toc_fatal::yes, + sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no, + sstable_directory::allow_loading_materialized_view::no, + [&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { + auto& cf = e.local_db().find_column_family("ks", "cf"); + return cf.make_sstable(dir.native(), gen, v, f); + }).get(); + + auto stop = defer([&sstdir] { + sstdir.stop().get(); + }); + + distributed_loader::process_sstable_dir(sstdir).get(); + verify_that_all_sstables_are_local(sstdir, 0).get(); + + int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); + std::atomic generation_for_test = {}; + generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed); + + distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { + auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed); + auto& cf = e.local_db().find_column_family("ks", "cf"); + return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big); + }).get(); + verify_that_all_sstables_are_local(sstdir, 2 * smp::count * smp::count).get(); + }); +}