compaction: move output run id from compaction_info into task

this run id is used to track partial runs that are being written to.
let's move it from info into task, as this is not an external info,
but rather one that belongs to compaction_manager.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2021-09-30 13:12:16 -03:00
parent 1edcc3a218
commit 1f5b17fdc5
6 changed files with 23 additions and 8 deletions

View File

@@ -533,7 +533,6 @@ public:
info->ks_name = cf.schema()->ks_name();
info->cf_name = cf.schema()->cf_name();
info->type = descriptor.options.type();
info->run_identifier = descriptor.run_identifier;
info->cf = &cf;
info->compaction_uuid = utils::UUID_gen::get_time_UUID();
return info;

View File

@@ -66,7 +66,6 @@ namespace sstables {
int64_t ended_at;
std::vector<shared_sstable> new_sstables;
sstring stop_requested;
utils::UUID run_identifier;
utils::UUID compaction_uuid;
struct replacement {
const std::vector<shared_sstable> removed;

View File

@@ -178,8 +178,8 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const c
candidates.reserve(cf.sstables_count());
// prevents sstables that belongs to a partial run being generated by ongoing compaction from being
// selected for compaction, which could potentially result in wrong behavior.
auto partial_run_identifiers = boost::copy_range<std::unordered_set<utils::UUID>>(_compactions
| boost::adaptors::transformed(std::mem_fn(&sstables::compaction_info::run_identifier)));
auto partial_run_identifiers = boost::copy_range<std::unordered_set<utils::UUID>>(_tasks
| boost::adaptors::transformed(std::mem_fn(&task::output_run_identifier)));
auto& cs = cf.get_compaction_strategy();
// Filter out sstables that are being compacted.
@@ -258,6 +258,7 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting->release_compacting(exhausted_sstables);
};
task->output_run_identifier = descriptor.run_identifier;
cmlog.info0("User initiated compaction started on behalf of {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name());
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
@@ -580,6 +581,7 @@ void compaction_manager::submit(column_family* cf) {
column_family& cf = *task->compacting_cf;
sstables::compaction_strategy cs = cf.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(cf, get_candidates(cf));
task->output_run_identifier = descriptor.run_identifier;
int weight = calculate_weight(descriptor.sstables);
if (descriptor.sstables.empty() || !can_proceed(task) || cf.is_auto_compaction_disabled_by_user()) {
@@ -721,6 +723,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
auto sstable_set_snapshot = can_purge ? std::make_optional(cf.get_sstable_set()) : std::nullopt;
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), _maintenance_sg.io,
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
task->output_run_identifier = run_identifier;
// Releases reference to cleaned sstable such that respective used disk space can be freed.
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {

View File

@@ -73,6 +73,7 @@ private:
bool stopping = false;
sstables::compaction_type type = sstables::compaction_type::Compaction;
bool compaction_running = false;
utils::UUID output_run_identifier;
};
// compaction manager may have N fibers to allow parallel compaction per shard.
@@ -280,6 +281,7 @@ public:
friend class compacting_sstable_registration;
friend class compaction_weight_registration;
friend class compaction_manager_test;
};
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s);

View File

@@ -126,7 +126,7 @@ static flat_mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, rea
return sst->as_mutation_source().make_reader(s, std::move(permit), query::full_partition_range, s->full_slice());
}
SEASTAR_TEST_CASE(compaction_manager_test) {
SEASTAR_TEST_CASE(compaction_manager_basic_test) {
return test_env::do_with_async([] (test_env& env) {
BOOST_REQUIRE(smp::count == 1);
auto s = make_shared_schema({}, some_keyspace, some_column_family,
@@ -3048,9 +3048,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
// register partial sstable run
auto c_info = make_lw_shared<compaction_info>();
c_info->run_identifier = partial_sstable_run_identifier;
cm->register_compaction(c_info);
compaction_manager_test(*cm).register_compaction(*cf, partial_sstable_run_identifier);
cf->compact_all_sstables().get();

View File

@@ -343,6 +343,20 @@ public:
} // namespace sstables
class compaction_manager_test {
compaction_manager& _cm;
public:
explicit compaction_manager_test(compaction_manager& cm) : _cm(cm) {}
void register_compaction(column_family& cf, utils::UUID output_run_id) {
auto task = make_lw_shared<compaction_manager::task>();
task->compacting_cf = &cf;
task->compaction_running = true;
task->output_run_identifier = std::move(output_run_id);
_cm._tasks.push_back(task);
}
};
future<compaction_info> compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf,
std::function<shared_sstable()> creator, sstables::compaction_sstable_replacer_fn replacer = sstables::replacer_fn_no_op());