distributed_load: initial handling of off-strategy SSTables

Off-strategy SSTables are SSTables that do not conform to the invariants
that the compaction strategies define. Examples of offstrategy SSTables
are SSTables acquired over bootstrap, resharding when the cpu count
changes or imported from other databases through our upload directory.

This patch introduces a new class, sstable_directory, that will
handle SSTables that are present in a directory that is not one of the
directories where the table expects its SSTables.

There is much to be done to support off-strategy compactions fully. To
make sure we make incremental progress, this patch implements enough
code to handle resharding of SSTables in the upload directory. SSTables
are resharded in place, before we start accessing the files.

Later, we will take other steps before we finally move the SSTables into
the main directory. But for now, starting with resharding will not only
allow us to start small, but it will also allow us to start unleashing
much needed cleanups in many places. For instance, once we start
resharding on boot before making the SSTables available, we will be able
to expurge all places in Scylla where, during normal operations, we have
extra handler code for the fact that SSTables could be shared.

Tests: a new test is added and it passes in debug mode.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2020-05-25 16:19:09 -04:00
parent e48ad3dc23
commit aebd965f0e
6 changed files with 1206 additions and 0 deletions

View File

@@ -365,6 +365,7 @@ scylla_tests = set([
'test/boost/schema_changes_test',
'test/boost/sstable_conforms_to_mutation_source_test',
'test/boost/sstable_resharding_test',
'test/boost/sstable_directory_test',
'test/boost/sstable_test',
'test/boost/storage_proxy_test',
'test/boost/top_k_test',

View File

@@ -239,6 +239,147 @@ future<> distributed_loader::verify_owner_and_mode(fs::path path) {
});
};
future<>
distributed_loader::process_sstable_dir(sharded<sstables::sstable_directory>& dir) {
return dir.invoke_on_all([&dir] (sstables::sstable_directory& d) {
// Supposed to be called with the node either down or on behalf of maintenance tasks
// like nodetool refresh
return d.process_sstable_dir(service::get_local_streaming_read_priority()).then([&dir, &d] {
return d.move_foreign_sstables(dir);
});
}).then([&dir] {
return dir.invoke_on_all([&dir] (sstables::sstable_directory& d) {
return d.commit_directory_changes();
});
});
}
future<>
distributed_loader::lock_table(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring cf_name) {
return dir.invoke_on_all([&db, ks_name, cf_name] (sstables::sstable_directory& d) {
auto& table = db.local().find_column_family(ks_name, cf_name);
d.store_phaser(table.write_in_progress());
return make_ready_future<>();
});
}
// Helper structure for resharding.
//
// Describes the sstables (represented by their foreign_sstable_open_info) that are shared and
// need to be resharded. Each shard will keep one such descriptor, that contains the list of
// SSTables assigned to it, and their total size. The total size is used to make sure we are
// fairly balancing SSTables among shards.
struct reshard_shard_descriptor {
sstables::sstable_directory::sstable_info_vector info_vec;
uint64_t uncompressed_data_size = 0;
bool total_size_smaller(const reshard_shard_descriptor& rhs) const {
return uncompressed_data_size < rhs.uncompressed_data_size;
}
uint64_t size() const {
return uncompressed_data_size;
}
};
// Collects shared SSTables from all shards and returns a vector containing them all.
// This function assumes that the list of SSTables can be fairly big so it is careful to
// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators.
future<sstables::sstable_directory::sstable_info_vector>
collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir) {
return do_with(sstables::sstable_directory::sstable_info_vector(), [&dir] (sstables::sstable_directory::sstable_info_vector& info_vec) {
// We want to make sure that each distributed object reshards about the same amount of data.
// Each sharded object has its own shared SSTables. We can use a clever algorithm in which they
// all distributely figure out which SSTables to exchange, but we'll keep it simple and move all
// their foreign_sstable_open_info to a coordinator (the shard who called this function). We can
// move in bulk and that's efficient. That shard can then distribute the work among all the
// others who will reshard.
auto coordinator = this_shard_id();
// We will first move all of the foreign open info to temporary storage so that we can sort
// them. We want to distribute bigger sstables first.
return dir.invoke_on_all([&info_vec, coordinator] (sstables::sstable_directory& d) {
return smp::submit_to(coordinator, [&info_vec, info = d.retrieve_shared_sstables()] () mutable {
// We want do_for_each here instead of a loop to avoid stalls. Resharding can be
// called during node operations too. For example, if it is called to load new
// SSTables into the system.
return do_for_each(info, [&info_vec] (sstables::foreign_sstable_open_info& info) {
info_vec.push_back(std::move(info));
});
});
}).then([&info_vec] () mutable {
return make_ready_future<sstables::sstable_directory::sstable_info_vector>(std::move(info_vec));
});
});
}
// Given a vector of shared sstables to be resharded, distribute it among all shards.
// The vector is first sorted to make sure that we are moving the biggest SSTables first.
//
// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do.
future<std::vector<reshard_shard_descriptor>>
distribute_reshard_jobs(sstables::sstable_directory::sstable_info_vector source) {
return do_with(std::move(source), std::vector<reshard_shard_descriptor>(smp::count),
[] (sstables::sstable_directory::sstable_info_vector& source, std::vector<reshard_shard_descriptor>& destinations) mutable {
std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) {
// Sort on descending SSTable sizes.
return a.uncompressed_data_size > b.uncompressed_data_size;
});
return do_for_each(source, [&destinations] (sstables::foreign_sstable_open_info& info) mutable {
auto shard_it = boost::min_element(destinations, std::mem_fn(&reshard_shard_descriptor::total_size_smaller));
shard_it->uncompressed_data_size += info.uncompressed_data_size;
shard_it->info_vec.push_back(std::move(info));
}).then([&destinations] () mutable {
return make_ready_future<std::vector<reshard_shard_descriptor>>(std::move(destinations));
});
});
}
future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vector<reshard_shard_descriptor> reshard_jobs,
sharded<database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) {
uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&reshard_shard_descriptor::size)), uint64_t(0));
if (total_size == 0) {
return make_ready_future<>();
}
return do_with(std::move(reshard_jobs), [&dir, &db, ks_name, table_name, creator = std::move(creator), total_size] (std::vector<reshard_shard_descriptor>& reshard_jobs) {
auto total_size_mb = total_size / 1000000.0;
auto start = std::chrono::steady_clock::now();
dblog.info("{}", fmt::format("Resharding {:.2f} MB", total_size_mb));
return dir.invoke_on_all([&dir, &db, &reshard_jobs, ks_name, table_name, creator] (sstables::sstable_directory& d) mutable {
auto& table = db.local().find_column_family(ks_name, table_name);
auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec);
auto& cm = table.get_compaction_manager();
auto max_threshold = table.schema()->max_compaction_threshold();
auto& iop = service::get_local_streaming_read_priority();
return d.reshard(std::move(info_vec), cm, table, max_threshold, creator, iop).then([&d, &dir] {
return d.move_foreign_sstables(dir);
});
}).then([start, total_size_mb] {
// add a microsecond to prevent division by zero
auto now = std::chrono::steady_clock::now() + 1us;
auto seconds = std::chrono::duration_cast<std::chrono::duration<float>>(now - start).count();
dblog.info("{}", fmt::format("Resharded {:.2f} MB in {:.2f} seconds, {:.2f} MB/s", total_size_mb, seconds, (total_size_mb / seconds)));
return make_ready_future<>();
});
});
}
// Global resharding function. Done in two parts:
// - The first part spreads the foreign_sstable_open_info across shards so that all of them are
// resharding about the same amount of data
// - The second part calls each shard's distributed object to reshard the SSTables they were
// assigned.
future<>
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) {
return collect_all_shared_sstables(dir).then([] (sstables::sstable_directory::sstable_info_vector all_jobs) mutable {
return distribute_reshard_jobs(std::move(all_jobs));
}).then([&dir, &db, ks_name, table_name, creator = std::move(creator)] (std::vector<reshard_shard_descriptor> destinations) mutable {
return run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, creator = std::move(creator));
});
}
// This function will iterate through upload directory in column family,
// and will do the following for each sstable found:
// 1) Mutate sstable level to 0.

View File

@@ -29,6 +29,7 @@
#include <vector>
#include <functional>
#include "seastarx.hh"
#include "sstables/compaction_descriptor.hh"
class database;
class table;
@@ -44,6 +45,7 @@ namespace sstables {
class entry_descriptor;
class foreign_sstable_open_info;
class sstable_directory;
}
@@ -56,6 +58,10 @@ class migration_manager;
class distributed_loader {
public:
static future<> reshard(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator);
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir);
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring cf_name);
static void reshard(distributed<database>& db, sstring ks_name, sstring cf_name);
static future<> open_sstable(distributed<database>& db, sstables::entry_descriptor comps,
std::function<future<> (column_family&, sstables::foreign_sstable_open_info)> func,

View File

@@ -19,7 +19,15 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/adaptor/map.hpp>
#include "sstables/sstable_directory.hh"
#include "sstables/sstables.hh"
#include "sstables/compaction_manager.hh"
#include "log.hh"
#include "sstable_directory.hh"
#include "lister.hh"
static logging::logger dirlog("sstable_directory");
namespace sstables {
@@ -32,4 +40,312 @@ bool manifest_json_filter(const fs::path&, const directory_entry& entry) {
return true;
}
sstable_directory::sstable_directory(fs::path sstable_dir,
unsigned load_parallelism,
need_mutate_level need_mutate_level,
lack_of_toc_fatal throw_on_missing_toc,
enable_dangerous_direct_import_of_cassandra_counters eddiocc,
allow_loading_materialized_view allow_mv,
sstable_object_from_existing_fn sstable_from_existing)
: _sstable_dir(std::move(sstable_dir))
, _load_semaphore(load_parallelism)
, _need_mutate_level(need_mutate_level)
, _throw_on_missing_toc(throw_on_missing_toc)
, _enable_dangerous_direct_import_of_cassandra_counters(eddiocc)
, _allow_loading_materialized_view(allow_mv)
, _sstable_object_from_existing_sstable(std::move(sstable_from_existing))
, _unshared_remote_sstables(smp::count)
{}
void
sstable_directory::handle_component(scan_state& state, sstables::entry_descriptor desc, fs::path filename) {
// If not owned by us, skip
if ((desc.generation % smp::count) != this_shard_id()) {
return;
}
dirlog.trace("for SSTable directory, scanning {}", filename);
state.generations_found.emplace(desc.generation, filename);
switch (desc.component) {
case component_type::TemporaryStatistics:
// We generate TemporaryStatistics when we rewrite the Statistics file,
// for instance on mutate_level. We should delete it - so we mark it for deletion
// here, but just the component. The old statistics file should still be there
// and we'll go with it.
_files_for_removal.insert(filename.native());
break;
case component_type::TOC:
state.descriptors.push_back(std::move(desc));
break;
case component_type::TemporaryTOC:
state.temp_toc_found.push_back(std::move(desc));
break;
default:
// Do nothing, and will validate when trying to load the file.
break;
}
}
void sstable_directory::validate(sstables::shared_sstable sst) const {
schema_ptr s = sst->get_schema();
if (s->is_counter() && !sst->has_scylla_component()) {
sstring error = "Direct loading non-Scylla SSTables containing counters is not supported.";
if (_enable_dangerous_direct_import_of_cassandra_counters) {
dirlog.info("{} But trying to continue on user's request.", error);
} else {
dirlog.error("{} Use sstableloader instead.", error);
throw std::runtime_error(fmt::format("{} Use sstableloader instead.", error));
}
}
if (s->is_view() && !_allow_loading_materialized_view) {
throw std::runtime_error("Loading Materialized View SSTables is not supported. Re-create the view instead.");
}
}
future<>
sstable_directory::process_descriptor(sstables::entry_descriptor desc, const ::io_priority_class& iop) {
if (sstables::is_later(desc.version, _max_version_seen)) {
_max_version_seen = desc.version;
}
auto sst = _sstable_object_from_existing_sstable(_sstable_dir, desc.generation, desc.version, desc.format);
return sst->load(iop).then([this, sst] {
validate(sst);
if (_need_mutate_level) {
dirlog.trace("Mutating {} to level 0\n", sst->get_filename());
return sst->mutate_sstable_level(0);
} else {
return make_ready_future<>();
}
}).then([sst, this] {
return sst->get_open_info().then([sst, this] (sstables::foreign_sstable_open_info info) {
auto shards = sst->get_shards_for_this_sstable();
if (shards.size() == 1) {
if (shards[0] == this_shard_id()) {
dirlog.trace("{} identified as a local unshared SSTable", sst->get_filename());
_unshared_local_sstables.push_back(sst);
} else {
dirlog.trace("{} identified as a remote unshared SSTable", sst->get_filename());
_unshared_remote_sstables[shards[0]].push_back(std::move(info));
}
} else {
dirlog.trace("{} identified as a shared SSTable", sst->get_filename());
_shared_sstable_info.push_back(std::move(info));
}
return make_ready_future<>();
});
});
}
int64_t
sstable_directory::highest_generation_seen() const {
return _max_generation_seen;
}
sstables::sstable_version_types
sstable_directory::highest_version_seen() const {
return _max_version_seen;
}
future<>
sstable_directory::process_sstable_dir(const ::io_priority_class& iop) {
dirlog.debug("Start processing directory {} for SSTables", _sstable_dir);
// It seems wasteful that each shard is repeating this scan, and to some extent it is.
// However, we still want to open the files and especially call process_dir() in a distributed
// fashion not to overload any shard. Also in the common case the SSTables will all be
// unshared and be on the right shard based on their generation number. In light of that there are
// two advantages of having each shard repeat the directory listing:
//
// - The directory listing part already interacts with data_structures inside scan_state. We
// would have to either transfer a lot of file information among shards or complicate the code
// to make sure they all update their own version of scan_state and then merge it.
// - If all shards scan in parallel, they can start loading sooner. That is faster than having
// a separate step to fetch all files, followed by another step to distribute and process.
return do_with(scan_state{}, [this, &iop] (scan_state& state) {
return lister::scan_dir(_sstable_dir, { directory_entry_type::regular },
[this, &state] (fs::path parent_dir, directory_entry de) {
auto comps = sstables::entry_descriptor::make_descriptor(_sstable_dir.native(), de.name);
handle_component(state, std::move(comps), parent_dir / fs::path(de.name));
return make_ready_future<>();
}, &manifest_json_filter).then([this, &state, &iop] {
// Always okay to delete files with a temporary TOC. We want to do it before we process
// the generations seen: it's okay to reuse those generations since the files will have
// been deleted anyway.
for (auto& desc: state.temp_toc_found) {
auto range = state.generations_found.equal_range(desc.generation);
for (auto it = range.first; it != range.second; ++it) {
auto& path = it->second;
dirlog.trace("Scheduling to remove file {}, from an SSTable with a Temporary TOC", path.native());
_files_for_removal.insert(path.native());
}
state.generations_found.erase(range.first, range.second);
}
_max_generation_seen = boost::accumulate(state.generations_found | boost::adaptors::map_keys, int64_t(0), [] (int64_t a, int64_t b) {
return std::max<int64_t>(a, b);
});
dirlog.debug("{} After {} scanned, seen generation {}. {} descriptors found, {} different files found ",
_sstable_dir, _max_generation_seen, state.descriptors.size(), state.generations_found.size());
// _descriptors is everything with a TOC. So after we remove this, what's left is
// SSTables for which a TOC was not found.
return parallel_for_each_restricted(state.descriptors, [this, &state, &iop] (sstables::entry_descriptor desc) {
state.generations_found.erase(desc.generation);
// This will try to pre-load this file and throw an exception if it is invalid
return process_descriptor(std::move(desc), iop);
}).then([this, &state] {
// For files missing TOC, it depends on where this is coming from.
// If scylla was supposed to have generated this SSTable, this is not okay and
// we refuse to proceed. If this coming from, say, an import, then we just delete,
// log and proceed.
for (auto& path : state.generations_found | boost::adaptors::map_values) {
if (_throw_on_missing_toc) {
throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable {}!. Refusing to boot", _sstable_dir.native(), path.native()));
} else {
dirlog.info("Found incomplete SSTable {} at directory {}. Removing", path.native(), _sstable_dir.native());
_files_for_removal.insert(path.native());
}
}
});
});
});
}
future<>
sstable_directory::commit_directory_changes() {
// Remove all files scheduled for removal
return parallel_for_each(std::exchange(_files_for_removal, {}), [] (sstring path) {
dirlog.info("Removing file {}", path);
return remove_file(std::move(path));
});
}
future<>
sstable_directory::move_foreign_sstables(sharded<sstable_directory>& source_directory) {
return parallel_for_each(boost::irange(0u, smp::count), [this, &source_directory] (unsigned shard_id) mutable {
auto info_vec = std::exchange(_unshared_remote_sstables[shard_id], {});
if (info_vec.empty()) {
return make_ready_future<>();
}
// Should be empty, since an SSTable that belongs to this shard is not remote.
assert(shard_id != this_shard_id());
dirlog.debug("{} Moving {} unshared SSTables to shard {} ", info_vec.size(), shard_id);
return source_directory.invoke_on(shard_id, &sstables::sstable_directory::load_foreign_sstables, std::move(info_vec));
});
}
future<>
sstable_directory::load_foreign_sstables(sstable_info_vector info_vec) {
return parallel_for_each_restricted(info_vec, [this] (sstables::foreign_sstable_open_info& info) {
auto sst = _sstable_object_from_existing_sstable(_sstable_dir, info.generation, info.version, info.format);
return sst->load(std::move(info)).then([sst, this] {
_unshared_local_sstables.push_back(sst);
return make_ready_future<>();
});
});
}
future<>
sstable_directory::remove_input_sstables_from_resharding(const std::vector<sstables::shared_sstable>& sstlist) {
dirlog.debug("Removing {} resharded SSTables", sstlist.size());
return parallel_for_each(sstlist, [] (sstables::shared_sstable sst) {
dirlog.trace("Removing resharded SSTable {}", sst->get_filename());
return sst->unlink();
});
}
future<>
sstable_directory::collect_output_sstables_from_resharding(std::vector<sstables::shared_sstable> resharded_sstables) {
dirlog.debug("Collecting {} resharded SSTables", resharded_sstables.size());
return parallel_for_each(std::move(resharded_sstables), [this] (sstables::shared_sstable sst) {
auto shards = sst->get_shards_for_this_sstable();
assert(shards.size() == 1);
auto shard = shards[0];
if (shard == this_shard_id()) {
dirlog.trace("Collected resharded SSTable {} already local", sst->get_filename());
_unshared_local_sstables.push_back(std::move(sst));
return make_ready_future<>();
}
dirlog.trace("Collected resharded SSTable {} is remote. Storing it", sst->get_filename());
return sst->get_open_info().then([this, shard, sst] (sstables::foreign_sstable_open_info info) {
_unshared_remote_sstables[shard].push_back(std::move(info));
return make_ready_future<>();
});
});
}
future<>
sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager& cm, table& table,
unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop)
{
// Resharding doesn't like empty sstable sets, so bail early. There is nothing
// to reshard in this shard.
if (shared_info.empty()) {
return make_ready_future<>();
}
// We want to reshard many SSTables at a time for efficiency. However if we have to many we may
// be risking OOM.
auto num_jobs = shared_info.size() / max_sstables_per_job + 1;
auto sstables_per_job = shared_info.size() / num_jobs;
using reshard_buckets = std::vector<std::vector<sstables::shared_sstable>>;
return do_with(reshard_buckets(1), [this, &cm, &table, sstables_per_job, iop, num_jobs, creator = std::move(creator), shared_info = std::move(shared_info)] (reshard_buckets& buckets) mutable {
return parallel_for_each(shared_info, [this, sstables_per_job, num_jobs, &buckets] (sstables::foreign_sstable_open_info& info) {
auto sst = _sstable_object_from_existing_sstable(_sstable_dir, info.generation, info.version, info.format);
return sst->load(std::move(info)).then([this, &buckets, sstables_per_job, num_jobs, sst = std::move(sst)] () mutable {
// Last bucket gets leftover SSTables
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
buckets.emplace_back();
}
buckets.back().push_back(std::move(sst));
});
}).then([this, &cm, &table, &buckets, iop, creator = std::move(creator)] () mutable {
// There is a semaphore inside the compaction manager in run_resharding_jobs. So we
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return cm.run_resharding_job(&table, [this, iop, &cm, &table, creator, &sstlist] () {
sstables::compaction_descriptor desc(sstlist, {}, iop);
desc.options = sstables::compaction_options::make_reshard();
desc.creator = std::move(creator);
return sstables::compact_sstables(std::move(desc), table).then([this, &sstlist] (sstables::compaction_info result) {
return when_all_succeed(collect_output_sstables_from_resharding(std::move(result.new_sstables)), remove_input_sstables_from_resharding(sstlist));
});
});
});
});
});
}
future<>
sstable_directory::do_for_each_sstable(std::function<future<>(sstables::shared_sstable)> func) {
return parallel_for_each_restricted(_unshared_local_sstables, std::move(func));
}
template <typename Container, typename Func>
future<>
sstable_directory::parallel_for_each_restricted(Container&& C, Func&& func) {
return parallel_for_each(C, [this, func = std::move(func)] (auto& el) mutable {
return with_semaphore(_load_semaphore, 1, [this, func, el = std::move(el)] () mutable {
return func(el);
});
});
}
void
sstable_directory::store_phaser(utils::phased_barrier::operation op) {
_operation_barrier.emplace(std::move(op));
}
sstable_directory::sstable_info_vector
sstable_directory::retrieve_shared_sstables() {
return std::exchange(_shared_sstable_info, {});
}
}

View File

@@ -23,9 +23,169 @@
#include "lister.hh"
#include <filesystem>
#include <seastar/core/file.hh>
#include <seastar/core/sharded.hh>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <functional>
#include "seastarx.hh"
#include "sstables/shared_sstable.hh" // sstables::shared_sstable
#include "sstables/version.hh" // sstable versions
#include "sstables/compaction_descriptor.hh" // for compaction_sstable_creator_fn
#include "sstables/open_info.hh" // for entry_descriptor and foreign_sstable_open_info, chunked_vector wants to know if they are move constructible
#include "utils/chunked_vector.hh"
#include "utils/phased_barrier.hh"
class compaction_manager;
namespace sstables {
bool manifest_json_filter(const std::filesystem::path&, const directory_entry& entry);
// Handles a directory containing SSTables. It could be an auxiliary directory (like upload),
// or the main directory.
class sstable_directory {
public:
using lack_of_toc_fatal = bool_class<class lack_of_toc_fatal_tag>;
using need_mutate_level = bool_class<class need_mutate_level_tag>;
using enable_dangerous_direct_import_of_cassandra_counters = bool_class<class enable_dangerous_direct_import_of_cassandra_counters_tag>;
using allow_loading_materialized_view = bool_class<class allow_loading_materialized_view_tag>;
using sstable_object_from_existing_fn =
noncopyable_function<sstables::shared_sstable(std::filesystem::path,
int64_t,
sstables::sstable_version_types,
sstables::sstable_format_types)>;
// favor chunked vectors when dealing with file lists: they can grow to hundreds of thousands
// of elements.
using sstable_info_vector = utils::chunked_vector<sstables::foreign_sstable_open_info>;
private:
using scan_multimap = std::unordered_multimap<int64_t, std::filesystem::path>;
using scan_descriptors = utils::chunked_vector<sstables::entry_descriptor>;
struct scan_state {
scan_multimap generations_found;
scan_descriptors temp_toc_found;
scan_descriptors descriptors;
};
// SSTable files to be deleted: things with a Temporary TOC, missing TOC files,
// TemporaryStatistics, etc. Not part of the scan state, because we want to do a 2-phase
// delete: maybe one of the shards will have signaled an error. And in the case of an error
// we don't want to delete anything.
std::unordered_set<sstring> _files_for_removal;
// prevents an object that respects a phaser (usually a table) from disappearing in the middle of the operation.
// Will be destroyed when this object is destroyed.
std::optional<utils::phased_barrier::operation> _operation_barrier;
std::filesystem::path _sstable_dir;
// We may have hundreds of thousands of files to load. To protect against OOMs we will limit
// how many of them we process at the same time.
semaphore _load_semaphore;
// Flags below control how to behave when scanning new SSTables.
need_mutate_level _need_mutate_level;
lack_of_toc_fatal _throw_on_missing_toc;
enable_dangerous_direct_import_of_cassandra_counters _enable_dangerous_direct_import_of_cassandra_counters;
allow_loading_materialized_view _allow_loading_materialized_view;
// How to create an SSTable object from an existing SSTable file (respecting generation, etc)
sstable_object_from_existing_fn _sstable_object_from_existing_sstable;
int64_t _max_generation_seen = 0;
sstables::sstable_version_types _max_version_seen = sstables::sstable_version_types::ka;
// SSTables that are unshared and belong to this shard. They are already stored as an
// SSTable object.
utils::chunked_vector<sstables::shared_sstable> _unshared_local_sstables;
// SSTables that are unshared and belong to foreign shards. Because they are more conveniently
// stored as a foreign_sstable_open_info object, they are in a different attribute separate from the
// local SSTables.
//
// The indexes of the outer vector represent the shards. Having anything in the index
// representing this shard is illegal.
std::vector<sstable_info_vector> _unshared_remote_sstables;
// SSTables that are shared. Stored as foreign_sstable_open_info objects. Reason is those are
// the SSTables that we found, and not necessarily the ones we will reshard. We want to balance
// the amount of data resharded per shard, so a coordinator may redistribute this.
sstable_info_vector _shared_sstable_info;
future<> process_descriptor(sstables::entry_descriptor desc, const ::io_priority_class& iop);
void validate(sstables::shared_sstable sst) const;
void handle_component(scan_state& state, sstables::entry_descriptor desc, std::filesystem::path filename);
future<> remove_input_sstables_from_resharding(const std::vector<sstables::shared_sstable>& sstlist);
future<> collect_output_sstables_from_resharding(std::vector<sstables::shared_sstable> resharded_sstables);
template <typename Container, typename Func>
future<> parallel_for_each_restricted(Container&& C, Func&& func);
future<> load_foreign_sstables(sstable_info_vector info_vec);
public:
sstable_directory(std::filesystem::path sstable_dir,
unsigned load_parallelism,
need_mutate_level need_mutate,
lack_of_toc_fatal fatal_nontoc,
enable_dangerous_direct_import_of_cassandra_counters eddiocc,
allow_loading_materialized_view,
sstable_object_from_existing_fn sstable_from_existing);
// moves unshared SSTables that don't belong to this shard to the right shards.
future<> move_foreign_sstables(sharded<sstable_directory>& source_directory);
// returns what is the highest generation seen in this directory.
int64_t highest_generation_seen() const;
// returns what is the highest version seen in this directory.
sstables::sstable_version_types highest_version_seen() const;
// scans a directory containing SSTables. Every generation that is believed to belong to this
// shard is processed, the ones that are not are skipped. Potential pertinence is decided as
// generation % smp::count.
//
// Once this method return, every SSTable that this shard processed can be in one of 3 states:
// - unshared, local: not a shared SSTable, and indeed belongs to this shard.
// - unshared, remote: not s shared SSTable, but belongs to a remote shard.
// - shared : shared SSTable that belongs to many shards. Must be resharded before using
//
// This function doesn't change on-storage state. If files are to be removed, a separate call
// (commit_file_removals()) has to be issued. This is to make sure that all instances of this
// class in a sharded service have the opportunity to validate its files.
future<> process_sstable_dir(const ::io_priority_class& iop);
// If files were scheduled to be removed, they will be removed after this call.
future<> commit_directory_changes();
// reshards a collection of SSTables.
//
// A reference to the compaction manager must be passed so we can register with it. Knowing
// which table is being processed is a requirement of the compaction manager, so this must be
// passed too.
//
// We will reshard max_sstables_per_job at once.
//
// A creator function must be passed that will create an SSTable object in the correct shard,
// and an I/O priority must be specified.
future<> reshard(sstable_info_vector info, compaction_manager& cm, table& table,
unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator,
const ::io_priority_class& iop);
// Store a phased operation. Usually used to keep an object alive while the directory is being
// processed. One example is preventing table drops concurrent to the processing of this
// directory.
void store_phaser(utils::phased_barrier::operation op);
// Helper function that processes all unshared SSTables belonging to this shard, respecting the
// concurrency limit.
future<> do_for_each_sstable(std::function<future<>(sstables::shared_sstable)> func);
// Retrieves the list of shared SSTables in this object. The list will be reset once this
// is called.
sstable_info_vector retrieve_shared_sstables();
};
}

View File

@@ -0,0 +1,582 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/testing/thread_test_case.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/core/sstring.hh>
#include "sstables/sstable_directory.hh"
#include "distributed_loader.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/tmpdir.hh"
#include "db/config.hh"
#include "fmt/format.h"
schema_ptr test_table_schema() {
static thread_local auto s = [] {
schema_builder builder(make_lw_shared(schema(
generate_legacy_id("ks", "cf"), "ks", "cf",
// partition key
{{"p", bytes_type}},
// clustering key
{},
// regular columns
{{"c", int32_type}},
// static columns
{},
// regular column name type
bytes_type,
// comment
""
)));
return builder.build(schema_builder::compact_storage::no);
}();
return s;
}
using namespace sstables;
future<sstables::shared_sstable>
make_sstable_for_this_shard(std::function<sstables::shared_sstable()> sst_factory) {
auto s = test_table_schema();
auto key_token_pair = token_generation_for_shard(1, this_shard_id(), 12);
auto key = partition_key::from_exploded(*s, {to_bytes(key_token_pair[0].first)});
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0));
return make_ready_future<sstables::shared_sstable>(make_sstable_containing(sst_factory, {m}));
}
/// Create a shared SSTable belonging to all shards for the following schema: "create table cf (p text PRIMARY KEY, c int)"
///
/// Arguments passed to the function are passed to table::make_sstable
template <typename... Args>
future<sstables::shared_sstable>
make_sstable_for_all_shards(database& db, table& table, fs::path sstdir, int64_t generation, Args&&... args) {
// Unlike the previous helper, we'll assume we're in a thread here. It's less flexible
// but the users are usually in a thread, and rewrite_toc_without_scylla_component requires
// a thread. We could fix that, but deferring that for now.
auto s = table.schema();
auto mt = make_lw_shared<memtable>(s);
auto msb = db.get_config().murmur3_partitioner_ignore_msb_bits();
for (shard_id shard = 0; shard < smp::count; ++shard) {
auto key_token_pair = token_generation_for_shard(1, shard, msb);
auto key = partition_key::from_exploded(*s, {to_bytes(key_token_pair[0].first)});
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0));
mt->apply(std::move(m));
}
auto sst = table.make_sstable(sstdir.native(), generation++, args...);
write_memtable_to_sstable(*mt, sst, table.get_sstables_manager().configure_writer()).get();
mt->clear_gently().get();
// We can't write an SSTable with bad sharding, so pretend
// it came from Cassandra
sstables::test(sst).remove_component(sstables::component_type::Scylla).get();
sstables::test(sst).rewrite_toc_without_scylla_component();
return make_ready_future<sstables::shared_sstable>(sst);
}
sstables::shared_sstable sstable_from_existing_file(fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return test_sstables_manager.make_sstable(test_table_schema(), dir.native(), gen, v, f, gc_clock::now(), default_io_error_handler_gen(), default_sstable_buffer_size);
}
template <typename... Args>
sstables::shared_sstable new_sstable(fs::path dir, int64_t gen) {
return test_sstables_manager.make_sstable(test_table_schema(), dir.native(), gen,
sstables::sstable_version_types::mc, sstables::sstable_format_types::big,
gc_clock::now(), default_io_error_handler_gen(), default_sstable_buffer_size);
}
// there is code for this in distributed_loader.cc but this is so simple it is not worth polluting
// the public namespace for it. Repeat it here.
inline future<int64_t>
highest_generation_seen(sharded<sstables::sstable_directory>& dir) {
return dir.map_reduce0(std::mem_fn(&sstable_directory::highest_generation_seen), int64_t(0), [] (int64_t a, int64_t b) {
return std::max<int64_t>(a, b);
});
}
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) {
auto dir = tmpdir();
// Write a manifest file to make sure it's ignored
auto manifest = dir.path() / "manifest.json";
auto f = open_file_dma(manifest.native(), open_flags::wo | open_flags::create | open_flags::truncate).get0();
f.close().get();
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
distributed_loader::process_sstable_dir(sstdir).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
// No generation found on empty directory.
BOOST_REQUIRE_EQUAL(max_generation_seen, 0);
}
// Test unrecoverable SSTable: missing a file that is expected in the TOC.
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0();
// Now there is one sstable to the upload directory, but it is incomplete and one component is missing.
// We should fail validation and leave the directory untouched
remove_file(sst->filename(sstables::component_type::Statistics)).get();
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir);
BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception);
}
// Test always-benign incomplete SSTable: temporaryTOC found
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_temporary_toc) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0();
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
auto expect_ok = distributed_loader::process_sstable_dir(sstdir);
BOOST_REQUIRE_NO_THROW(expect_ok.get());
}
// Test the absence of TOC. Behavior is controllable by a flag
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_missing_toc) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0();
remove_file(sst->filename(sstables::component_type::TOC)).get();
sharded<sstable_directory> sstdir_fatal;
sstdir_fatal.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop_fatal = defer([&sstdir_fatal] {
sstdir_fatal.stop().get();
});
auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal);
BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception);
sharded<sstable_directory> sstdir_ok;
sstdir_ok.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop_ok = defer([&sstdir_ok] {
sstdir_ok.stop().get();
});
auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok);
BOOST_REQUIRE_NO_THROW(expect_ok.get());
}
// Test the presence of TemporaryStatistics. If the old Statistics file is around
// this is benign and we'll just delete it and move on. If the old Statistics file
// is not around (but mentioned in the TOC), then this is an error.
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0();
auto tempstr = sst->filename(dir.path().native(), component_type::TemporaryStatistics);
auto f = open_file_dma(tempstr, open_flags::rw | open_flags::create | open_flags::truncate).get0();
f.close().get();
auto tempstat = fs::canonical(fs::path(tempstr));
sharded<sstable_directory> sstdir_ok;
sstdir_ok.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop_ok= defer([&sstdir_ok] {
sstdir_ok.stop().get();
});
auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok);
BOOST_REQUIRE_NO_THROW(expect_ok.get());
lister::scan_dir(dir.path(), { directory_entry_type::regular }, [tempstat] (fs::path parent_dir, directory_entry de) {
BOOST_REQUIRE(fs::canonical(parent_dir / fs::path(de.name)) != tempstat);
return make_ready_future<>();
}).get();
remove_file(sst->filename(sstables::component_type::Statistics)).get();
sharded<sstable_directory> sstdir_fatal;
sstdir_fatal.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop_fatal = defer([&sstdir_fatal] {
sstdir_fatal.stop().get();
});
auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal);
BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception);
}
// Test that we see the right generation during the scan. Temporary files are skipped
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) {
auto dir = tmpdir();
make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 3333)).get0();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 6666)).get0();
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
distributed_loader::process_sstable_dir(sstdir).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
BOOST_REQUIRE_EQUAL(max_generation_seen, 3333);
}
future<> verify_that_all_sstables_are_local(sharded<sstable_directory>& sstdir, unsigned expected_sstables) {
return do_with(std::make_unique<std::atomic<unsigned>>(0), [&sstdir, expected_sstables] (std::unique_ptr<std::atomic<unsigned>>& count) {
return sstdir.invoke_on_all([count = count.get()] (sstable_directory& d) {
return d.do_for_each_sstable([count] (sstables::shared_sstable sst) {
count->fetch_add(1, std::memory_order_relaxed);
auto shards = sst->get_shards_for_this_sstable();
BOOST_REQUIRE_EQUAL(shards.size(), 1);
BOOST_REQUIRE_EQUAL(shards[0], this_shard_id());
return make_ready_future<>();
});
}).then([count = count.get(), expected_sstables] {
BOOST_REQUIRE_EQUAL(count->load(std::memory_order_relaxed), expected_sstables);
return make_ready_future<>();
});
});
}
// Test that all SSTables are seen as unshared, if the generation numbers match what their
// shard-assignments expect
SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_generations) {
auto dir = tmpdir();
for (shard_id i = 0; i < smp::count; ++i) {
smp::submit_to(i, [dir = dir.path(), i] {
// this is why it is annoying for the internal functions in the test infrastructure to
// assume threaded execution
return seastar::async([dir, i] {
make_sstable_for_this_shard(std::bind(new_sstable, dir, i)).get0();
});
}).get();
}
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, smp::count).get();
}
// Test that all SSTables are seen as unshared, even if the generation numbers do not match what their
// shard-assignments expect
SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_generations) {
auto dir = tmpdir();
for (shard_id i = 0; i < smp::count; ++i) {
smp::submit_to(i, [dir = dir.path(), i] {
// this is why it is annoying for the internal functions in the test infrastructure to
// assume threaded execution
return seastar::async([dir, i] {
make_sstable_for_this_shard(std::bind(new_sstable, dir, i + 1)).get0();
});
}).get();
}
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, smp::count).get();
}
// Test that the sstable_dir object can keep the table alive against a drop
SEASTAR_TEST_CASE(sstable_directory_test_table_lock_works) {
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
auto ks_name = "ks";
auto cf_name = "cf";
auto& cf = e.local_db().find_column_family(ks_name, cf_name);
auto path = fs::path(cf.dir());
sharded<sstable_directory> sstdir;
sstdir.start(path, 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
// stop cleanly in case we fail early for unexpected reasons
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
distributed_loader::lock_table(sstdir, e.db(), ks_name, cf_name).get();
auto drop = e.execute_cql("drop table cf");
later().get();
auto table_ok = e.db().invoke_on_all([ks_name, cf_name] (database& db) {
db.find_column_family(ks_name, cf_name);
});
BOOST_REQUIRE_NO_THROW(table_ok.get());
// Stop manually now, to allow for the object to be destroyed and take the
// phaser with it.
stop.cancel();
sstdir.stop().get();
drop.get();
auto no_such_table = e.db().invoke_on_all([ks_name, cf_name] (database& db) {
db.find_column_family(ks_name, cf_name);
return make_ready_future<>();
});
BOOST_REQUIRE_THROW(no_such_table.get(), no_such_column_family);
});
}
SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
if (smp::count == 1) {
fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n");
return make_ready_future<>();
}
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
auto& cf = e.local_db().find_column_family("ks", "cf");
auto upload_path = fs::path(cf.dir()) / "upload";
e.db().invoke_on_all([] (database& db) {
auto& cf = db.find_column_family("ks", "cf");
cf.disable_auto_compaction();
}).get();
unsigned num_sstables = 10 * smp::count;
auto generation = 0;
for (unsigned nr = 0; nr < num_sstables; ++nr) {
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get();
}
sharded<sstable_directory> sstdir;
sstdir.start(upload_path, 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
[&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<int64_t> generation_for_test = {};
generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed);
distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) {
auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed);
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big);
}).get();
verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get();
});
}
SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_even_if_files_are_not_well_distributed) {
if (smp::count == 1) {
fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n");
return make_ready_future<>();
}
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
auto& cf = e.local_db().find_column_family("ks", "cf");
auto upload_path = fs::path(cf.dir()) / "upload";
e.db().invoke_on_all([] (database& db) {
auto& cf = db.find_column_family("ks", "cf");
cf.disable_auto_compaction();
}).get();
unsigned num_sstables = 10 * smp::count;
auto generation = 0;
for (unsigned nr = 0; nr < num_sstables; ++nr) {
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++ * smp::count, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get();
}
sharded<sstable_directory> sstdir;
sstdir.start(upload_path, 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
[&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<int64_t> generation_for_test = {};
generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed);
distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) {
auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed);
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big);
}).get();
verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get();
});
}
SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshold) {
if (smp::count == 1) {
fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n");
return make_ready_future<>();
}
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
auto& cf = e.local_db().find_column_family("ks", "cf");
auto upload_path = fs::path(cf.dir()) / "upload";
e.db().invoke_on_all([] (database& db) {
auto& cf = db.find_column_family("ks", "cf");
cf.disable_auto_compaction();
}).get();
unsigned num_sstables = (cf.schema()->max_compaction_threshold() + 1) * smp::count;
auto generation = 0;
for (unsigned nr = 0; nr < num_sstables; ++nr) {
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get();
}
sharded<sstable_directory> sstdir;
sstdir.start(upload_path, 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
[&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
std::atomic<int64_t> generation_for_test = {};
generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed);
distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) {
auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed);
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big);
}).get();
verify_that_all_sstables_are_local(sstdir, 2 * smp::count * smp::count).get();
});
}