compaction_manager: Wire cleanup task into the strategy cleanup method

As the cleanup process can now be driven by the compaction strategy,
let's move cleanup into a new task type that uses the new
compaction_strategy::get_cleanup_compaction_jobs().

By the time being all strategies are using the default method that
returns one descriptor for each sstable that needs clean up.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2022-03-21 13:28:59 -03:00
parent 44e9e10414
commit c7826aa910
2 changed files with 87 additions and 5 deletions

View File

@@ -1095,7 +1095,9 @@ private:
}
};
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, compaction_manager::task>
future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
if (_state != state::enabled) {
co_return;
}
@@ -1119,7 +1121,11 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
return a->data_size() > b->data_size();
});
});
co_await perform_task(seastar::make_shared<rewrite_sstables_compaction_task>(*this, t, std::move(options), std::move(sstables), std::move(compacting), can_purge));
co_await perform_task(seastar::make_shared<TaskType>(*this, t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
}
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
return perform_task_on_all_files<rewrite_sstables_compaction_task>(t, std::move(options), std::move(get_func), can_purge);
}
class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task {
@@ -1177,6 +1183,74 @@ future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table*
return perform_task(seastar::make_shared<validate_sstables_compaction_task>(*this, t, std::move(all_sstables)));
}
class compaction_manager::cleanup_sstables_compaction_task : public compaction_manager::task {
const sstables::compaction_type_options _cleanup_options;
compacting_sstable_registration _compacting;
std::vector<sstables::compaction_descriptor> _pending_cleanup_jobs;
public:
cleanup_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options,
std::vector<sstables::shared_sstable> candidates, compacting_sstable_registration compacting)
: task(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
, _cleanup_options(std::move(options))
, _compacting(std::move(compacting))
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), candidates))
{
// Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs
// will have more space available released by previous jobs.
std::ranges::sort(_pending_cleanup_jobs, std::ranges::greater(), std::mem_fn(&sstables::compaction_descriptor::sstables_size));
_cm._stats.pending_tasks += _pending_cleanup_jobs.size();
}
virtual ~cleanup_sstables_compaction_task() {
_cm._stats.pending_tasks -= _pending_cleanup_jobs.size();
}
protected:
virtual future<> do_run() override {
switch_state(state::pending);
auto maintenance_permit = co_await seastar::get_units(_cm._maintenance_ops_sem, 1);
while (!_pending_cleanup_jobs.empty() && can_proceed()) {
auto active_job = std::move(_pending_cleanup_jobs.back());
active_job.options = _cleanup_options;
co_await run_cleanup_job(std::move(active_job));
_pending_cleanup_jobs.pop_back();
_cm._stats.pending_tasks--;
}
}
private:
// Releases reference to cleaned files such that respective used disk space can be freed.
void release_exhausted(std::vector<sstables::shared_sstable> exhausted_sstables) {
_compacting.release_compacting(exhausted_sstables);
}
future<> run_cleanup_job(sstables::compaction_descriptor descriptor) {
co_await coroutine::switch_to(_cm._compaction_controller.sg());
for (;;) {
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
_cm.register_backlog_tracker(user_initiated);
std::exception_ptr ex;
try {
setup_new_compaction(descriptor.run_identifier);
co_await compact_sstables(descriptor, _compaction_data,
std::bind(&cleanup_sstables_compaction_task::release_exhausted, this, std::placeholders::_1));
finish_compaction();
_cm.reevaluate_postponed_compactions();
co_return; // done with current job
} catch (...) {
ex = std::current_exception();
}
finish_compaction(state::failed);
// retry current job or rethrows exception
if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) {
co_return;
}
}
}
};
bool needs_cleanup(const sstables::shared_sstable& sst,
const dht::token_range_vector& sorted_owned_ranges,
schema_ptr s) {
@@ -1208,8 +1282,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab
});
};
if (check_for_cleanup()) {
return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
t->schema()->ks_name(), t->schema()->cf_name())));
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
t->schema()->ks_name(), t->schema()->cf_name()));
}
auto sorted_owned_ranges = db.get_keyspace_local_ranges(t->schema()->ks_name());
@@ -1226,7 +1300,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab
});
};
return rewrite_sstables(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables));
co_await perform_task_on_all_files<cleanup_sstables_compaction_task>(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)),
std::move(get_sstables));
}
// Submit a table to be upgraded and wait for its termination.

View File

@@ -222,6 +222,7 @@ public:
class regular_compaction_task;
class offstrategy_compaction_task;
class rewrite_sstables_compaction_task;
class cleanup_sstables_compaction_task;
class validate_sstables_compaction_task;
class compaction_manager_test_task;
@@ -327,6 +328,12 @@ private:
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;
// Guarantees that a maintenance task, e.g. cleanup, will be performed on all files available at the time
// by retrieving set of candidates only after all compactions for table T were stopped, if any.
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, task>
future<> perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func, Args... args);
future<> rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
public:
compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as);