distributed_loader: reshard before the node is made online

This patch moves the resharding process to use the new
directory_with_sstables_handler infrastructure. There is no longer
a clear reshard step, and that just becomes a natural part of
populate_column_family.

In main.cc, a couple of changes are necessary to make that happen.
The first one obviously is to stop calling reshard. We also need to
make sure that:
 - The compaction manager is started much earlier, so we can register
   resharding jobs with it.
 - auto compactions are disabled in the populate method, so resharding
   doesn't have to fight for bandwidth with auto compactions.

Now that we are resharding through the sstable_directory, the old
resharding code can be deleted. There is also no need to deal with
the resharding backlog either, because the SSTables are not yet
added to the sstable set at this point.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2020-05-18 11:59:19 -04:00
parent b34c0c2ff6
commit e40aa042a7
13 changed files with 70 additions and 465 deletions

View File

@@ -63,8 +63,6 @@ public:
compaction_descriptor get_major_compaction_job(column_family& cf, std::vector<shared_sstable> candidates);
std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<shared_sstable> candidates);
// Some strategies may look at the compacted and resulting sstables to
// get some useful information for subsequent compactions.
void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added);

View File

@@ -724,6 +724,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.")
, max_memory_for_unlimited_query(this, "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, size_t(1) << 20,
"Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries.")
, initial_sstable_loading_concurrency(this, "initial_sstable_loading_concurrency", value_status::Used, 4u,
"Maximum amount of sstables to load in parallel during initialization. A higher number can lead to more memory consumption. You should not need to touch this")
, enable_3_1_0_compatibility_mode(this, "enable_3_1_0_compatibility_mode", value_status::Used, false,
"Set to true if the cluster was initially installed from 3.1.0. If it was upgraded from an earlier version,"
" or installed from a later version, leave this set to false. This adjusts the communication protocol to"

View File

@@ -304,6 +304,7 @@ public:
named_value<uint32_t> max_partition_key_restrictions_per_query;
named_value<uint32_t> max_clustering_key_restrictions_per_query;
named_value<uint64_t> max_memory_for_unlimited_query;
named_value<unsigned> initial_sstable_loading_concurrency;
named_value<bool> enable_3_1_0_compatibility_mode;
named_value<bool> enable_user_defined_functions;
named_value<unsigned> user_defined_function_time_limit_ms;

View File

@@ -113,87 +113,6 @@ public:
}
};
// checks whether or not a given column family is worth resharding by checking if any of its
// sstables has more than one owner shard.
static future<bool> worth_resharding(distributed<database>& db, global_column_family_ptr cf) {
auto has_shared_sstables = [cf] (database& db) {
return cf->has_shared_sstables();
};
return db.map_reduce0(has_shared_sstables, bool(false), std::logical_or<bool>());
}
static future<std::vector<sstables::shared_sstable>>
load_sstables_with_open_info(std::vector<sstables::foreign_sstable_open_info> ssts_info, global_column_family_ptr cf, sstring dir,
noncopyable_function<bool (const sstables::foreign_sstable_open_info&)> pred) {
return do_with(std::vector<sstables::shared_sstable>(), [ssts_info = std::move(ssts_info), cf, dir, pred = std::move(pred)] (auto& ssts) mutable {
return parallel_for_each(std::move(ssts_info), [&ssts, cf, dir, pred = std::move(pred)] (auto& info) mutable {
if (!pred(info)) {
return make_ready_future<>();
}
auto sst = cf->make_sstable(dir, info.generation, info.version, info.format);
return sst->load(std::move(info)).then([&ssts, sst] {
ssts.push_back(std::move(sst));
return make_ready_future<>();
});
}).then([&ssts] () mutable {
return std::move(ssts);
});
});
}
// make a set of sstables available at another shard.
static future<> forward_sstables_to(shard_id shard, sstring directory, std::vector<sstables::shared_sstable> sstables, global_column_family_ptr cf,
std::function<future<> (std::vector<sstables::shared_sstable>)> func) {
return seastar::async([sstables = std::move(sstables), directory, shard, cf, func = std::move(func)] () mutable {
auto infos = boost::copy_range<std::vector<sstables::foreign_sstable_open_info>>(sstables
| boost::adaptors::transformed([] (auto&& sst) { return sst->get_open_info().get0(); }));
smp::submit_to(shard, [cf, func, infos = std::move(infos), directory] () mutable {
return load_sstables_with_open_info(std::move(infos), cf, directory, [] (auto& p) {
return true;
}).then([func] (std::vector<sstables::shared_sstable> sstables) {
return func(std::move(sstables));
});
}).get();
});
}
// Return all sstables that need resharding in the system. Only one instance of a shared sstable is returned.
static future<std::vector<sstables::shared_sstable>> get_all_shared_sstables(distributed<database>& db, sstring sstdir, global_column_family_ptr cf) {
class all_shared_sstables {
sstring _dir;
global_column_family_ptr _cf;
std::unordered_map<int64_t, sstables::shared_sstable> _result;
public:
all_shared_sstables(const sstring& sstdir, global_column_family_ptr cf) : _dir(sstdir), _cf(cf) {}
future<> operator()(std::vector<sstables::foreign_sstable_open_info> ssts_info) {
return load_sstables_with_open_info(std::move(ssts_info), _cf, _dir, [this] (auto& info) {
// skip loading of shared sstable that is already stored in _result.
return !_result.count(info.generation);
}).then([this] (std::vector<sstables::shared_sstable> sstables) {
for (auto& sst : sstables) {
auto gen = sst->generation();
_result.emplace(gen, std::move(sst));
}
return make_ready_future<>();
});
}
std::vector<sstables::shared_sstable> get() && {
return boost::copy_range<std::vector<sstables::shared_sstable>>(std::move(_result) | boost::adaptors::map_values);
}
};
return db.map_reduce(all_shared_sstables(sstdir, cf), [cf, sstdir] (database& db) mutable {
return seastar::async([cf, sstdir] {
return boost::copy_range<std::vector<sstables::foreign_sstable_open_info>>(cf->sstables_need_rewrite()
| boost::adaptors::filtered([sstdir] (auto&& sst) { return sst->get_dir() == sstdir; })
| boost::adaptors::transformed([] (auto&& sst) { return sst->get_open_info().get0(); }));
});
});
}
template <typename... Args>
static inline
future<> verification_error(fs::path path, const char* fstr, Args&&... args) {
@@ -586,155 +505,6 @@ future<> distributed_loader::open_sstable(distributed<database>& db, sstables::e
});
}
// invokes each descriptor at its target shard, which involves forwarding sstables too.
static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, sstring directory, std::vector<sstables::resharding_descriptor> jobs,
std::function<future<> (std::vector<sstables::shared_sstable>, uint32_t level, uint64_t max_sstable_bytes)> func) {
return parallel_for_each(std::move(jobs), [cf, func, &directory] (sstables::resharding_descriptor& job) mutable {
return forward_sstables_to(job.reshard_at, directory, std::move(job.sstables), cf,
[cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) {
// compaction manager ensures that only one reshard operation will run per shard.
auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable {
return func(std::move(sstables), level, max_sstable_bytes);
};
return cf->get_compaction_manager().run_custom_job(&*cf, "resharding", std::move(job));
});
});
}
static std::vector<sstables::shared_sstable> sstables_for_shard(const std::vector<sstables::shared_sstable>& sstables, shard_id shard) {
auto belongs_to_shard = [] (const sstables::shared_sstable& sst, unsigned shard) {
auto& shards = sst->get_shards_for_this_sstable();
return boost::range::find(shards, shard) != shards.end();
};
return boost::copy_range<std::vector<sstables::shared_sstable>>(sstables
| boost::adaptors::filtered([&] (auto& sst) { return belongs_to_shard(sst, shard); }));
}
void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sstring cf_name) {
assert(this_shard_id() == 0); // NOTE: should always run on shard 0!
// ensures that only one column family is resharded at a time (that's okay because
// actual resharding is parallelized), and that's needed to prevent the same column
// family from being resharded in parallel (that could happen, for example, if
// refresh (triggers resharding) is issued by user while resharding is going on).
static semaphore sem(1);
// FIXME: discarded future.
(void)with_semaphore(sem, 1, [&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable {
return seastar::async([&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable {
global_column_family_ptr cf(db, ks_name, cf_name);
if (!cf->get_compaction_manager().enabled()) {
return;
}
// fast path to detect that this column family doesn't need reshard.
if (!worth_resharding(db, cf).get0()) {
dblog.debug("Nothing to reshard for {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name());
return;
}
parallel_for_each(cf->_config.all_datadirs, [&db, cf] (const sstring& directory) {
auto candidates = get_all_shared_sstables(db, directory, cf).get0();
dblog.debug("{} candidates for resharding for {}.{}", candidates.size(), cf->schema()->ks_name(), cf->schema()->cf_name());
auto jobs = cf->get_compaction_strategy().get_resharding_jobs(*cf, std::move(candidates));
dblog.debug("{} resharding jobs for {}.{}", jobs.size(), cf->schema()->ks_name(), cf->schema()->cf_name());
return invoke_all_resharding_jobs(cf, directory, std::move(jobs), [directory, &cf] (auto sstables, auto level, auto max_sstable_bytes) {
// FIXME: run it in maintenance priority.
// Resharding, currently, cannot provide compaction with a snapshot of the sstable set
// which spans all shards that input sstables belong to, so expiration is disabled.
std::optional<sstables::sstable_set> sstable_set = std::nullopt;
sstables::compaction_descriptor descriptor(sstables, std::move(sstable_set), service::get_local_compaction_priority(),
level, max_sstable_bytes);
descriptor.options = sstables::compaction_options::make_reshard();
descriptor.creator = [&cf, directory] (shard_id shard) mutable {
// we need generation calculated by instance of cf at requested shard,
// or resource usage wouldn't be fairly distributed among shards.
auto gen = smp::submit_to(shard, [&cf] () {
return cf->calculate_generation_for_new_table();
}).get0();
return cf->make_sstable(directory, gen,
cf->get_sstables_manager().get_highest_supported_format(),
sstables::sstable::format_types::big);
};
auto f = sstables::compact_sstables(std::move(descriptor), *cf);
return f.then([&cf, sstables = std::move(sstables), directory] (sstables::compaction_info info) mutable {
auto new_sstables = std::move(info.new_sstables);
// an input sstable may belong to shard 1 and 2 and only have data which
// token belongs to shard 1. That means resharding will only create a
// sstable for shard 1, but both shards opened the sstable. So our code
// below should ask both shards to remove the resharded table, or it
// wouldn't be deleted by our deletion manager, and resharding would be
// triggered again in the subsequent boot.
return parallel_for_each(boost::irange(0u, smp::count), [&cf, directory, sstables, new_sstables] (auto shard) {
auto old_sstables_for_shard = sstables_for_shard(sstables, shard);
// nothing to do if no input sstable belongs to this shard.
if (old_sstables_for_shard.empty()) {
return make_ready_future<>();
}
auto new_sstables_for_shard = sstables_for_shard(new_sstables, shard);
// sanity checks
for (auto& sst : new_sstables_for_shard) {
auto& shards = sst->get_shards_for_this_sstable();
if (shards.size() != 1) {
throw std::runtime_error(format("resharded sstable {} doesn't belong to only one shard", sst->get_filename()));
}
if (shards.front() != shard) {
throw std::runtime_error(format("resharded sstable {} should belong to shard {:d}", sst->get_filename(), shard));
}
}
std::unordered_set<uint64_t> ancestors;
boost::range::transform(old_sstables_for_shard, std::inserter(ancestors, ancestors.end()),
std::mem_fn(&sstables::sstable::generation));
if (new_sstables_for_shard.empty()) {
// handles case where sstable needing rewrite doesn't produce any sstable
// for a shard it belongs to when resharded (the reason is explained above).
return smp::submit_to(shard, [cf, ancestors = std::move(ancestors)] () mutable {
return cf->remove_ancestors_needed_rewrite(ancestors);
});
} else {
return forward_sstables_to(shard, directory, new_sstables_for_shard, cf, [cf, ancestors = std::move(ancestors)] (std::vector<sstables::shared_sstable> sstables) mutable {
return cf->replace_ancestors_needed_rewrite(std::move(ancestors), std::move(sstables));
});
}
}).then([&cf, sstables] {
// schedule deletion of shared sstables after we're certain that new unshared ones were successfully forwarded to respective shards.
(void)sstables::delete_atomically(sstables).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (...) {
dblog.warn("Exception in resharding when deleting sstable file: {}", eptr);
}
}).then([cf, sstables = std::move(sstables)] {
// Refresh cache's snapshot of shards involved in resharding to prevent the cache from
// holding reference to deleted files which results in disk space not being released.
std::unordered_set<shard_id> owner_shards;
for (auto& sst : sstables) {
const auto& shards = sst->get_shards_for_this_sstable();
owner_shards.insert(shards.begin(), shards.end());
if (owner_shards.size() == smp::count) {
break;
}
}
return parallel_for_each(std::move(owner_shards), [cf] (shard_id shard) {
return smp::submit_to(shard, [cf] () mutable {
cf->_cache.refresh_snapshot();
});
});
});
});
});
});
}).get();
});
});
}
future<sstables::entry_descriptor> distributed_loader::probe_file(distributed<database>& db, sstring sstdir, sstring fname) {
using namespace sstables;
@@ -856,92 +626,6 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele
});
}
future<> distributed_loader::do_populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf) {
// We can catch most errors when we try to load an sstable. But if the TOC
// file is the one missing, we won't try to load the sstable at all. This
// case is still an invalid case, but it is way easier for us to treat it
// by waiting for all files to be loaded, and then checking if we saw a
// file during scan_dir, without its corresponding TOC.
enum class component_status {
has_some_file,
has_toc_file,
has_temporary_toc_file,
};
struct sstable_descriptor {
component_status status;
sstables::sstable::version_types version;
sstables::sstable::format_types format;
};
auto verifier = make_lw_shared<std::unordered_map<unsigned long, sstable_descriptor>>();
return do_with(std::vector<future<>>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector<future<>>& futures) {
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) {
// FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".")
// push future returned by probe_file into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) {
if (entry.component == component_type::TemporaryStatistics) {
return remove_file(sstables::sstable::filename(sstdir.native(), entry.ks, entry.cf, entry.version, entry.generation,
entry.format, component_type::TemporaryStatistics));
}
if (verifier->count(entry.generation)) {
if (verifier->at(entry.generation).status == component_status::has_toc_file) {
fs::path file_path(sstdir / de.name);
if (entry.component == component_type::TOC) {
throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed", file_path.native());
} else if (entry.component == component_type::TemporaryTOC) {
throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed", file_path.native());
}
} else if (entry.component == component_type::TOC) {
verifier->at(entry.generation).status = component_status::has_toc_file;
} else if (entry.component == component_type::TemporaryTOC) {
verifier->at(entry.generation).status = component_status::has_temporary_toc_file;
}
} else {
if (entry.component == component_type::TOC) {
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_toc_file, entry.version, entry.format});
} else if (entry.component == component_type::TemporaryTOC) {
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_temporary_toc_file, entry.version, entry.format});
} else {
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_some_file, entry.version, entry.format});
}
}
return make_ready_future<>();
});
futures.push_back(std::move(f));
return make_ready_future<>();
}, &sstables::manifest_json_filter).then([&futures] {
return execute_futures(futures);
}).then([verifier, sstdir, ks = std::move(ks), cf = std::move(cf)] {
return do_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), verifier] (auto v) {
if (v.second.status == component_status::has_temporary_toc_file) {
unsigned long gen = v.first;
sstables::sstable::version_types version = v.second.version;
sstables::sstable::format_types format = v.second.format;
if (this_shard_id() != 0) {
dblog.debug("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first);
return make_ready_future<>();
}
// shard 0 is the responsible for removing a partial sstable.
return sstables::sstable::remove_sstable_with_temp_toc(ks, cf, sstdir, gen, version, format);
} else if (v.second.status != component_status::has_toc_file) {
throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable with generation {:d}!. Refusing to boot", sstdir, v.first));
}
return make_ready_future<>();
});
});
});
}
future<> distributed_loader::populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf) {
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf)] {
assert(this_shard_id() == 0);
@@ -952,8 +636,61 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
if (exists) {
handle_sstables_pending_delete(pending_delete_dir).get();
}
// Second pass, cleanup sstables with temporary TOCs and load the rest.
do_populate_column_family(db, std::move(sstdir), std::move(ks), std::move(cf)).get();
global_column_family_ptr global_table(db, ks, cf);
sharded<sstables::sstable_directory> directory;
directory.start(fs::path(sstdir), db.local().get_config().initial_sstable_loading_concurrency(),
sstables::sstable_directory::need_mutate_level::yes,
sstables::sstable_directory::lack_of_toc_fatal::yes,
sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()),
sstables::sstable_directory::allow_loading_materialized_view::yes,
[&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return global_table->make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&directory] {
directory.stop().get();
});
lock_table(directory, db, ks, cf).get();
process_sstable_dir(directory).get();
// If we are resharding system tables before we can read them, we will not
// know which is the highest format we support: this information is itself stored
// in the system tables. In that case we'll rely on what we find on disk: we'll
// at least not downgrade any files. If we already know that we support a higher
// format than the one we see then we use that.
auto sys_format = global_table->get_sstables_manager().get_highest_supported_format();
auto sst_version = highest_version_seen(directory, sys_format).get0();
auto generation = highest_generation_seen(directory).get0();
db.invoke_on_all([&global_table, generation] (database& db) {
global_table->update_sstables_known_generation(generation);
global_table->disable_auto_compaction();
return make_ready_future<>();
}).get();
reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable {
auto gen = smp::submit_to(shard, [&global_table] () {
return global_table->calculate_generation_for_new_table();
}).get0();
return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big);
}).get();
// The node is offline at this point so we are very lenient with what we consider
// offstrategy.
reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) {
auto gen = global_table->calculate_generation_for_new_table();
return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big);
}).get();
directory.invoke_on_all([global_table] (sstables::sstable_directory& dir) {
return dir.do_for_each_sstable([&global_table] (sstables::shared_sstable sst) {
return global_table->add_sstable_and_update_cache(sst);
});
}).get();
});
}

View File

@@ -65,7 +65,6 @@ public:
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir);
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring cf_name);
static void reshard(distributed<database>& db, sstring ks_name, sstring cf_name);
static future<> open_sstable(distributed<database>& db, sstables::entry_descriptor comps,
std::function<future<> (column_family&, sstables::foreign_sstable_open_info)> func,
const io_priority_class& pc = default_priority_class());

19
main.cc
View File

@@ -762,6 +762,11 @@ int main(int ac, char** av) {
dirs.init(*cfg, bool(hinted_handoff_enabled)).get();
// We need the compaction manager ready early so we can reshard.
db.invoke_on_all([&proxy, &stop_signal] (database& db) {
db.get_compaction_manager().enable();
}).get();
// Initialization of a keyspace is done by shard 0 only. For system
// keyspace, the procedure will go through the hardcoded column
// families, and in each of them, it will load the sstables for all
@@ -926,8 +931,11 @@ int main(int ac, char** av) {
}
}
db.invoke_on_all([&proxy] (database& db) {
db.get_compaction_manager().enable();
db.invoke_on_all([] (database& db) {
for (auto& x : db.get_column_families()) {
table& t = *(x.second);
t.enable_auto_compaction();
}
}).get();
// If the same sstable is shared by several shards, it cannot be
@@ -941,13 +949,6 @@ int main(int ac, char** av) {
// group that was effectively used in the bulk of it (compaction). Soon it will become
// streaming
with_scheduling_group(dbcfg.compaction_scheduling_group, [&db] {
for (auto& x : db.local().get_column_families()) {
column_family& cf = *(x.second);
distributed_loader::reshard(db, cf.schema()->ks_name(), cf.schema()->cf_name());
}
}).get();
db.invoke_on_all([&proxy] (database& db) {
for (auto& x : db.get_column_families()) {
column_family& cf = *(x.second);

View File

@@ -390,34 +390,6 @@ public:
}
};
// Resharding doesn't really belong into any strategy, because it is not worried about laying out
// SSTables according to any strategy-specific criteria. So we will just make it proportional to
// the amount of data we still have to reshard.
//
// Although at first it may seem like we could improve this by tracking the ongoing reshard as well
// and reducing the backlog as we compact, that is not really true. Resharding is not really
// expected to get rid of data and it is usually just splitting data among shards. Whichever backlog
// we get rid of by tracking the compaction will come back as a big spike as we add this SSTable
// back to their rightful shard owners.
//
// So because the data is supposed to be constant, we will just add the total amount of data as the
// backlog.
class resharding_backlog_tracker final : public compaction_backlog_tracker::impl {
uint64_t _total_bytes = 0;
public:
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
return _total_bytes;
}
virtual void add_sstable(sstables::shared_sstable sst) override {
_total_bytes += sst->data_size();
}
virtual void remove_sstable(sstables::shared_sstable sst) override {
_total_bytes -= sst->data_size();
}
};
class compaction {
protected:
column_family& _cf;
@@ -1247,7 +1219,6 @@ flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_co
class resharding_compaction final : public compaction {
shard_id _shard; // shard of current sstable writer
compaction_backlog_tracker _resharding_backlog_tracker;
// Partition count estimation for a shard S:
//
@@ -1277,14 +1248,10 @@ private:
public:
resharding_compaction(column_family& cf, sstables::compaction_descriptor descriptor)
: compaction(cf, std::move(descriptor))
, _resharding_backlog_tracker(std::make_unique<resharding_backlog_tracker>())
, _estimation_per_shard(smp::count)
, _run_identifiers(smp::count)
{
cf.get_compaction_manager().register_backlog_tracker(_resharding_backlog_tracker);
for (auto& sst : _sstables) {
_resharding_backlog_tracker.add_sstable(sst);
const auto& shards = sst->get_shards_for_this_sstable();
auto size = sst->bytes_on_disk();
auto estimated_partitions = sst->get_estimated_key_count();
@@ -1299,11 +1266,7 @@ public:
_info->type = compaction_type::Reshard;
}
~resharding_compaction() {
for (auto& s : _sstables) {
_resharding_backlog_tracker.remove_sstable(s);
}
}
~resharding_compaction() { }
// Use reader that makes sure no non-local mutation will not be filtered out.
flat_mutation_reader make_sstable_reader() const override {
@@ -1346,6 +1309,7 @@ public:
sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer();
cfg.max_sstable_size = _max_sstable_size;
cfg.monitor = &default_write_monitor();
// sstables generated for a given shard will share the same run identifier.
cfg.run_identifier = _run_identifiers.at(shard);
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(shard), cfg, get_encoding_stats(), _io_priority, shard), sst};

View File

@@ -50,13 +50,6 @@ namespace sstables {
friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput);
};
struct resharding_descriptor {
std::vector<sstables::shared_sstable> sstables;
uint64_t max_sstable_bytes;
shard_id reshard_at;
uint32_t level;
};
static inline sstring compaction_name(compaction_type type) {
switch (type) {
case compaction_type::Compaction:

View File

@@ -460,19 +460,6 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s
return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold;
}
std::vector<resharding_descriptor>
compaction_strategy_impl::get_resharding_jobs(column_family& cf, std::vector<sstables::shared_sstable> candidates) {
std::vector<resharding_descriptor> jobs;
shard_id reshard_at_current = 0;
clogger.debug("Trying to get resharding jobs for {}.{}...", cf.schema()->ks_name(), cf.schema()->cf_name());
for (auto& candidate : candidates) {
auto level = candidate->get_sstable_level();
jobs.push_back(resharding_descriptor{{std::move(candidate)}, std::numeric_limits<uint64_t>::max(), reshard_at_current++ % smp::count, level});
}
return jobs;
}
uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) {
return partition_estimate;
}
@@ -984,10 +971,6 @@ compaction_descriptor compaction_strategy::get_major_compaction_job(column_famil
return _compaction_strategy_impl->get_major_compaction_job(cf, std::move(candidates));
}
std::vector<resharding_descriptor> compaction_strategy::get_resharding_jobs(column_family& cf, std::vector<sstables::shared_sstable> candidates) {
return _compaction_strategy_impl->get_resharding_jobs(cf, std::move(candidates));
}
void compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
_compaction_strategy_impl->notify_completion(removed, added);
}

View File

@@ -73,7 +73,6 @@ public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual compaction_descriptor get_major_compaction_job(column_family& cf, std::vector<sstables::shared_sstable> candidates);
virtual std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<sstables::shared_sstable> candidates);
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) { }
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {

View File

@@ -76,39 +76,6 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(colu
sst->get_sstable_level(), _max_sstable_size_in_mb*1024*1024);
}
std::vector<resharding_descriptor> leveled_compaction_strategy::get_resharding_jobs(column_family& cf, std::vector<shared_sstable> candidates) {
leveled_manifest manifest = leveled_manifest::create(cf, candidates, _max_sstable_size_in_mb, _stcs_options);
std::vector<resharding_descriptor> descriptors;
shard_id target_shard = 0;
auto get_shard = [&target_shard] { return target_shard++ % smp::count; };
// Basically, we'll iterate through all levels, and for each, we'll sort the
// sstables by first key because there's a need to reshard together adjacent
// sstables.
// The shard at which the job will run is chosen in a round-robin fashion.
for (auto level = 0U; level <= manifest.get_level_count(); level++) {
uint64_t max_sstable_size = !level ? std::numeric_limits<uint64_t>::max() : (_max_sstable_size_in_mb*1024*1024);
auto& sstables = manifest.get_level(level);
boost::sort(sstables, [] (auto& i, auto& j) {
return i->compare_by_first_key(*j) < 0;
});
resharding_descriptor current_descriptor = resharding_descriptor{{}, max_sstable_size, get_shard(), level};
for (auto it = sstables.begin(); it != sstables.end(); it++) {
current_descriptor.sstables.push_back(*it);
auto next = std::next(it);
if (current_descriptor.sstables.size() == smp::count || next == sstables.end()) {
descriptors.push_back(std::move(current_descriptor));
current_descriptor = resharding_descriptor{{}, max_sstable_size, get_shard(), level};
}
}
}
return descriptors;
}
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
if (removed.empty() || added.empty()) {
return;

View File

@@ -41,8 +41,6 @@ public:
virtual compaction_descriptor get_major_compaction_job(column_family& cf, std::vector<sstables::shared_sstable> candidates) override;
virtual std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<shared_sstable> candidates) override;
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) override;
// for each level > 0, get newest sstable and use its last key as last

View File

@@ -133,43 +133,6 @@ SEASTAR_TEST_CASE(sstable_resharding_test) {
});
}
SEASTAR_THREAD_TEST_CASE(sstable_resharding_strategy_tests) {
test_env env;
for (const auto version : all_sstable_versions) {
auto s = make_lw_shared(schema({}, "ks", "cf", {{"p1", utf8_type}}, {}, {}, {}, utf8_type));
auto get_sstable = [&] (int64_t gen, sstring first_key, sstring last_key) mutable {
auto sst = env.make_sstable(s, "", gen, version, sstables::sstable::format_types::big);
stats_metadata stats = {};
stats.sstable_level = 1;
sstables::test(sst).set_values(std::move(first_key), std::move(last_key), std::move(stats));
return sst;
};
column_family_for_tests cf;
auto tokens = token_generation_for_current_shard(2);
auto stcs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options());
auto lcs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
auto sst1 = get_sstable(1, tokens[0].first, tokens[1].first);
auto sst2 = get_sstable(2, tokens[1].first, tokens[1].first);
{
// TODO: sstable_test runs with smp::count == 1, thus we will not be able to stress it more
// until we move this test case to sstable_resharding_test.
auto descriptors = stcs.get_resharding_jobs(*cf, { sst1, sst2 });
BOOST_REQUIRE(descriptors.size() == 2);
}
{
auto ssts = std::vector<sstables::shared_sstable>{ sst1, sst2 };
auto descriptors = lcs.get_resharding_jobs(*cf, ssts);
auto expected_jobs = (ssts.size()+smp::count-1)/smp::count;
BOOST_REQUIRE(descriptors.size() == expected_jobs);
}
}
}
SEASTAR_TEST_CASE(sstable_is_shared_correctness) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {