compaction_manager: Wire cleanup task into the strategy cleanup method
As the cleanup process can now be driven by the compaction strategy, let's move cleanup into a new task type that uses the new compaction_strategy::get_cleanup_compaction_jobs(). By the time being all strategies are using the default method that returns one descriptor for each sstable that needs clean up. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -1095,7 +1095,9 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
|
||||
template<typename TaskType, typename... Args>
|
||||
requires std::derived_from<TaskType, compaction_manager::task>
|
||||
future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
|
||||
if (_state != state::enabled) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1119,7 +1121,11 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
return a->data_size() > b->data_size();
|
||||
});
|
||||
});
|
||||
co_await perform_task(seastar::make_shared<rewrite_sstables_compaction_task>(*this, t, std::move(options), std::move(sstables), std::move(compacting), can_purge));
|
||||
co_await perform_task(seastar::make_shared<TaskType>(*this, t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
|
||||
return perform_task_on_all_files<rewrite_sstables_compaction_task>(t, std::move(options), std::move(get_func), can_purge);
|
||||
}
|
||||
|
||||
class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task {
|
||||
@@ -1177,6 +1183,74 @@ future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table*
|
||||
return perform_task(seastar::make_shared<validate_sstables_compaction_task>(*this, t, std::move(all_sstables)));
|
||||
}
|
||||
|
||||
class compaction_manager::cleanup_sstables_compaction_task : public compaction_manager::task {
|
||||
const sstables::compaction_type_options _cleanup_options;
|
||||
compacting_sstable_registration _compacting;
|
||||
std::vector<sstables::compaction_descriptor> _pending_cleanup_jobs;
|
||||
public:
|
||||
cleanup_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options,
|
||||
std::vector<sstables::shared_sstable> candidates, compacting_sstable_registration compacting)
|
||||
: task(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
|
||||
, _cleanup_options(std::move(options))
|
||||
, _compacting(std::move(compacting))
|
||||
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), candidates))
|
||||
{
|
||||
// Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs
|
||||
// will have more space available released by previous jobs.
|
||||
std::ranges::sort(_pending_cleanup_jobs, std::ranges::greater(), std::mem_fn(&sstables::compaction_descriptor::sstables_size));
|
||||
_cm._stats.pending_tasks += _pending_cleanup_jobs.size();
|
||||
}
|
||||
|
||||
virtual ~cleanup_sstables_compaction_task() {
|
||||
_cm._stats.pending_tasks -= _pending_cleanup_jobs.size();
|
||||
}
|
||||
protected:
|
||||
virtual future<> do_run() override {
|
||||
switch_state(state::pending);
|
||||
auto maintenance_permit = co_await seastar::get_units(_cm._maintenance_ops_sem, 1);
|
||||
|
||||
while (!_pending_cleanup_jobs.empty() && can_proceed()) {
|
||||
auto active_job = std::move(_pending_cleanup_jobs.back());
|
||||
active_job.options = _cleanup_options;
|
||||
co_await run_cleanup_job(std::move(active_job));
|
||||
_pending_cleanup_jobs.pop_back();
|
||||
_cm._stats.pending_tasks--;
|
||||
}
|
||||
}
|
||||
private:
|
||||
// Releases reference to cleaned files such that respective used disk space can be freed.
|
||||
void release_exhausted(std::vector<sstables::shared_sstable> exhausted_sstables) {
|
||||
_compacting.release_compacting(exhausted_sstables);
|
||||
}
|
||||
|
||||
future<> run_cleanup_job(sstables::compaction_descriptor descriptor) {
|
||||
co_await coroutine::switch_to(_cm._compaction_controller.sg());
|
||||
|
||||
for (;;) {
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
|
||||
_cm.register_backlog_tracker(user_initiated);
|
||||
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
setup_new_compaction(descriptor.run_identifier);
|
||||
co_await compact_sstables(descriptor, _compaction_data,
|
||||
std::bind(&cleanup_sstables_compaction_task::release_exhausted, this, std::placeholders::_1));
|
||||
finish_compaction();
|
||||
_cm.reevaluate_postponed_compactions();
|
||||
co_return; // done with current job
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
finish_compaction(state::failed);
|
||||
// retry current job or rethrows exception
|
||||
if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
bool needs_cleanup(const sstables::shared_sstable& sst,
|
||||
const dht::token_range_vector& sorted_owned_ranges,
|
||||
schema_ptr s) {
|
||||
@@ -1208,8 +1282,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab
|
||||
});
|
||||
};
|
||||
if (check_for_cleanup()) {
|
||||
return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
|
||||
t->schema()->ks_name(), t->schema()->cf_name())));
|
||||
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
|
||||
t->schema()->ks_name(), t->schema()->cf_name()));
|
||||
}
|
||||
|
||||
auto sorted_owned_ranges = db.get_keyspace_local_ranges(t->schema()->ks_name());
|
||||
@@ -1226,7 +1300,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab
|
||||
});
|
||||
};
|
||||
|
||||
return rewrite_sstables(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables));
|
||||
co_await perform_task_on_all_files<cleanup_sstables_compaction_task>(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)),
|
||||
std::move(get_sstables));
|
||||
}
|
||||
|
||||
// Submit a table to be upgraded and wait for its termination.
|
||||
|
||||
@@ -222,6 +222,7 @@ public:
|
||||
class regular_compaction_task;
|
||||
class offstrategy_compaction_task;
|
||||
class rewrite_sstables_compaction_task;
|
||||
class cleanup_sstables_compaction_task;
|
||||
class validate_sstables_compaction_task;
|
||||
class compaction_manager_test_task;
|
||||
|
||||
@@ -327,6 +328,12 @@ private:
|
||||
|
||||
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;
|
||||
|
||||
// Guarantees that a maintenance task, e.g. cleanup, will be performed on all files available at the time
|
||||
// by retrieving set of candidates only after all compactions for table T were stopped, if any.
|
||||
template<typename TaskType, typename... Args>
|
||||
requires std::derived_from<TaskType, task>
|
||||
future<> perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func, Args... args);
|
||||
|
||||
future<> rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
|
||||
public:
|
||||
compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as);
|
||||
|
||||
Reference in New Issue
Block a user