compaction: move output run id from compaction_info into task
this run id is used to track partial runs that are being written to. let's move it from info into task, as this is not an external info, but rather one that belongs to compaction_manager. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -533,7 +533,6 @@ public:
|
||||
info->ks_name = cf.schema()->ks_name();
|
||||
info->cf_name = cf.schema()->cf_name();
|
||||
info->type = descriptor.options.type();
|
||||
info->run_identifier = descriptor.run_identifier;
|
||||
info->cf = &cf;
|
||||
info->compaction_uuid = utils::UUID_gen::get_time_UUID();
|
||||
return info;
|
||||
|
||||
@@ -66,7 +66,6 @@ namespace sstables {
|
||||
int64_t ended_at;
|
||||
std::vector<shared_sstable> new_sstables;
|
||||
sstring stop_requested;
|
||||
utils::UUID run_identifier;
|
||||
utils::UUID compaction_uuid;
|
||||
struct replacement {
|
||||
const std::vector<shared_sstable> removed;
|
||||
|
||||
@@ -178,8 +178,8 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const c
|
||||
candidates.reserve(cf.sstables_count());
|
||||
// prevents sstables that belongs to a partial run being generated by ongoing compaction from being
|
||||
// selected for compaction, which could potentially result in wrong behavior.
|
||||
auto partial_run_identifiers = boost::copy_range<std::unordered_set<utils::UUID>>(_compactions
|
||||
| boost::adaptors::transformed(std::mem_fn(&sstables::compaction_info::run_identifier)));
|
||||
auto partial_run_identifiers = boost::copy_range<std::unordered_set<utils::UUID>>(_tasks
|
||||
| boost::adaptors::transformed(std::mem_fn(&task::output_run_identifier)));
|
||||
auto& cs = cf.get_compaction_strategy();
|
||||
|
||||
// Filter out sstables that are being compacted.
|
||||
@@ -258,6 +258,7 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
|
||||
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
||||
compacting->release_compacting(exhausted_sstables);
|
||||
};
|
||||
task->output_run_identifier = descriptor.run_identifier;
|
||||
|
||||
cmlog.info0("User initiated compaction started on behalf of {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name());
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
||||
@@ -580,6 +581,7 @@ void compaction_manager::submit(column_family* cf) {
|
||||
column_family& cf = *task->compacting_cf;
|
||||
sstables::compaction_strategy cs = cf.get_compaction_strategy();
|
||||
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(cf, get_candidates(cf));
|
||||
task->output_run_identifier = descriptor.run_identifier;
|
||||
int weight = calculate_weight(descriptor.sstables);
|
||||
|
||||
if (descriptor.sstables.empty() || !can_proceed(task) || cf.is_auto_compaction_disabled_by_user()) {
|
||||
@@ -721,6 +723,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
|
||||
auto sstable_set_snapshot = can_purge ? std::make_optional(cf.get_sstable_set()) : std::nullopt;
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), _maintenance_sg.io,
|
||||
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
|
||||
task->output_run_identifier = run_identifier;
|
||||
|
||||
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
||||
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
||||
|
||||
@@ -73,6 +73,7 @@ private:
|
||||
bool stopping = false;
|
||||
sstables::compaction_type type = sstables::compaction_type::Compaction;
|
||||
bool compaction_running = false;
|
||||
utils::UUID output_run_identifier;
|
||||
};
|
||||
|
||||
// compaction manager may have N fibers to allow parallel compaction per shard.
|
||||
@@ -280,6 +281,7 @@ public:
|
||||
|
||||
friend class compacting_sstable_registration;
|
||||
friend class compaction_weight_registration;
|
||||
friend class compaction_manager_test;
|
||||
};
|
||||
|
||||
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s);
|
||||
|
||||
@@ -126,7 +126,7 @@ static flat_mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, rea
|
||||
return sst->as_mutation_source().make_reader(s, std::move(permit), query::full_partition_range, s->full_slice());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(compaction_manager_test) {
|
||||
SEASTAR_TEST_CASE(compaction_manager_basic_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
BOOST_REQUIRE(smp::count == 1);
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
@@ -3048,9 +3048,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
|
||||
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
|
||||
|
||||
// register partial sstable run
|
||||
auto c_info = make_lw_shared<compaction_info>();
|
||||
c_info->run_identifier = partial_sstable_run_identifier;
|
||||
cm->register_compaction(c_info);
|
||||
compaction_manager_test(*cm).register_compaction(*cf, partial_sstable_run_identifier);
|
||||
|
||||
cf->compact_all_sstables().get();
|
||||
|
||||
|
||||
@@ -343,6 +343,20 @@ public:
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
class compaction_manager_test {
|
||||
compaction_manager& _cm;
|
||||
public:
|
||||
explicit compaction_manager_test(compaction_manager& cm) : _cm(cm) {}
|
||||
|
||||
void register_compaction(column_family& cf, utils::UUID output_run_id) {
|
||||
auto task = make_lw_shared<compaction_manager::task>();
|
||||
task->compacting_cf = &cf;
|
||||
task->compaction_running = true;
|
||||
task->output_run_identifier = std::move(output_run_id);
|
||||
_cm._tasks.push_back(task);
|
||||
}
|
||||
};
|
||||
|
||||
future<compaction_info> compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf,
|
||||
std::function<shared_sstable()> creator, sstables::compaction_sstable_replacer_fn replacer = sstables::replacer_fn_no_op());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user