compaction: use compaction namespace in compaction_manager.cc

This commit is contained in:
Aleksandra Martyniuk
2023-03-29 15:28:14 +02:00
parent d7d570e39d
commit 0ceee3e4b3

View File

@@ -28,6 +28,7 @@
static logging::logger cmlog("compaction_manager");
using namespace std::chrono_literals;
using namespace compaction;
class compacting_sstable_registration {
compaction_manager& _cm;
@@ -161,7 +162,7 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const {
return std::min(unsigned(32), largest_fan_in);
}
bool compaction_manager::can_register_compaction(compaction::table_state& t, int weight, unsigned fan_in) const {
bool compaction_manager::can_register_compaction(table_state& t, int weight, unsigned fan_in) const {
// Only one weight is allowed if parallel compaction is disabled.
if (!t.get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t)) {
return false;
@@ -197,21 +198,21 @@ void compaction_manager::deregister_weight(int weight) {
reevaluate_postponed_compactions();
}
std::vector<sstables::shared_sstable> in_strategy_sstables(compaction::table_state& table_s) {
std::vector<sstables::shared_sstable> in_strategy_sstables(table_state& table_s) {
auto sstables = table_s.main_sstable_set().all();
return boost::copy_range<std::vector<sstables::shared_sstable>>(*sstables | boost::adaptors::filtered([] (const sstables::shared_sstable& sst) {
return sstables::is_eligible_for_compaction(sst);
}));
}
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(compaction::table_state& t) {
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(table_state& t) {
std::vector<sstables::shared_sstable> candidates;
candidates.reserve(t.main_sstable_set().size());
// prevents sstables that belongs to a partial run being generated by ongoing compaction from being
// selected for compaction, which could potentially result in wrong behavior.
auto partial_run_identifiers = boost::copy_range<std::unordered_set<sstables::run_id>>(_tasks
| boost::adaptors::filtered(std::mem_fn(&compaction::compaction_task_executor::generating_output_run))
| boost::adaptors::transformed(std::mem_fn(&compaction::compaction_task_executor::output_run_id)));
| boost::adaptors::filtered(std::mem_fn(&compaction_task_executor::generating_output_run))
| boost::adaptors::transformed(std::mem_fn(&compaction_task_executor::output_run_id)));
// Filter out sstables that are being compacted.
for (auto& sst : in_strategy_sstables(t)) {
@@ -260,7 +261,7 @@ private:
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
};
compaction_manager::compaction_state& compaction_manager::get_compaction_state(compaction::table_state* t) {
compaction_manager::compaction_state& compaction_manager::get_compaction_state(table_state* t) {
try {
return _compaction_state.at(t);
} catch (std::out_of_range&) {
@@ -269,7 +270,7 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(c
}
}
compaction::compaction_task_executor::compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc)
compaction_task_executor::compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type type, sstring desc)
: _cm(mgr)
, _compacting_table(t)
, _compaction_state(_cm.get_compaction_state(t))
@@ -278,7 +279,7 @@ compaction::compaction_task_executor::compaction_task_executor(compaction_manage
, _description(std::move(desc))
{}
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction::compaction_task_executor> task) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_task_executor> task) {
_tasks.push_back(task);
auto unregister_task = defer([this, task] {
_tasks.remove(task);
@@ -309,7 +310,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
co_return std::nullopt;
}
future<sstables::compaction_result> compaction::compaction_task_executor::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) {
future<sstables::compaction_result> compaction_task_executor::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) {
if (!descriptor.sstables.size()) {
// if there is nothing to compact, just return.
co_return sstables::compaction_result{};
@@ -324,8 +325,8 @@ future<sstables::compaction_result> compaction::compaction_task_executor::compac
co_return res;
}
future<sstables::compaction_result> compaction::compaction_task_executor::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) {
compaction::table_state& t = *_compacting_table;
future<sstables::compaction_result> compaction_task_executor::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) {
table_state& t = *_compacting_table;
if (can_purge) {
descriptor.enable_garbage_collection(t.main_sstable_set());
}
@@ -346,7 +347,7 @@ future<sstables::compaction_result> compaction::compaction_task_executor::compac
co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t);
}
future<> compaction::compaction_task_executor::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
future<> compaction_task_executor::update_history(table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
auto ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(res.stats.ended_at.time_since_epoch());
if (_cm._sys_ks) {
@@ -378,7 +379,7 @@ protected:
sstables::shared_sstable consume_sstable();
public:
explicit sstables_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type compaction_type, sstring desc, std::vector<sstables::shared_sstable> sstables)
explicit sstables_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type compaction_type, sstring desc, std::vector<sstables::shared_sstable> sstables)
: compaction_task_executor(mgr, t, compaction_type, std::move(desc))
{
set_sstables(std::move(sstables));
@@ -389,7 +390,7 @@ public:
class major_compaction_task_executor : public compaction_task_executor {
public:
major_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t)
major_compaction_task_executor(compaction_manager& mgr, table_state* t)
: compaction_task_executor(mgr, t, sstables::compaction_type::Compaction, "Major compaction")
{}
@@ -409,7 +410,7 @@ protected:
// candidates are sstables that aren't being operated on by other compaction types.
// those are eligible for major compaction.
compaction::table_state* t = _compacting_table;
table_state* t = _compacting_table;
sstables::compaction_strategy cs = t->get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t));
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
@@ -437,11 +438,11 @@ protected:
}
future<> compaction_manager::perform_major_compaction(compaction::table_state& t) {
future<> compaction_manager::perform_major_compaction(table_state& t) {
if (_state != state::enabled) {
return make_ready_future<>();
}
return perform_task(make_shared<compaction::major_compaction_task_executor>(*this, &t)).discard_result();;
return perform_task(make_shared<major_compaction_task_executor>(*this, &t)).discard_result();;
}
namespace compaction {
@@ -450,7 +451,7 @@ class custom_compaction_task_executor : public compaction_task_executor {
noncopyable_function<future<>(sstables::compaction_data&)> _job;
public:
custom_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc, noncopyable_function<future<>(sstables::compaction_data&)> job)
custom_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type type, sstring desc, noncopyable_function<future<>(sstables::compaction_data&)> job)
: compaction_task_executor(mgr, t, type, std::move(desc))
, _job(std::move(job))
{}
@@ -480,12 +481,12 @@ protected:
}
future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job) {
future<> compaction_manager::run_custom_job(table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job) {
if (_state != state::enabled) {
return make_ready_future<>();
}
return perform_task(make_shared<compaction::custom_compaction_task_executor>(*this, &t, type, desc, std::move(job))).discard_result();
return perform_task(make_shared<custom_compaction_task_executor>(*this, &t, type, desc, std::move(job))).discard_result();
}
future<> compaction_manager::update_static_shares(float static_shares) {
@@ -493,7 +494,7 @@ future<> compaction_manager::update_static_shares(float static_shares) {
return _compaction_controller.update_static_shares(static_shares);
}
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, compaction::table_state& t)
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, table_state& t)
: _cm(cm)
, _table(&t)
, _compaction_state(cm.get_compaction_state(_table))
@@ -526,20 +527,20 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() {
}
future<compaction_manager::compaction_reenabler>
compaction_manager::stop_and_disable_compaction(compaction::table_state& t) {
compaction_manager::stop_and_disable_compaction(table_state& t) {
compaction_reenabler cre(*this, t);
co_await stop_ongoing_compactions("user-triggered operation", &t);
co_return cre;
}
future<>
compaction_manager::run_with_compaction_disabled(compaction::table_state& t, std::function<future<> ()> func) {
compaction_manager::run_with_compaction_disabled(table_state& t, std::function<future<> ()> func) {
compaction_reenabler cre = co_await stop_and_disable_compaction(t);
co_await func();
}
std::string_view compaction::compaction_task_executor::to_string(state s) {
std::string_view compaction_task_executor::to_string(state s) {
switch (s) {
case state::none: return "none";
case state::pending: return "pending";
@@ -553,11 +554,11 @@ std::string_view compaction::compaction_task_executor::to_string(state s) {
namespace compaction {
std::ostream& operator<<(std::ostream& os, compaction::compaction_task_executor::state s) {
return os << compaction::compaction_task_executor::to_string(s);
std::ostream& operator<<(std::ostream& os, compaction_task_executor::state s) {
return os << compaction_task_executor::to_string(s);
}
std::ostream& operator<<(std::ostream& os, const compaction::compaction_task_executor& task) {
std::ostream& operator<<(std::ostream& os, const compaction_task_executor& task) {
return os << task.describe();
}
@@ -571,21 +572,21 @@ compaction_manager::compaction_state::~compaction_state() {
compaction_done.broken();
}
std::string compaction::compaction_task_executor::describe() const {
std::string compaction_task_executor::describe() const {
auto* t = _compacting_table;
auto s = t->schema();
return fmt::format("{} task {} for table {}.{} [{}]", _description, fmt::ptr(this), s->ks_name(), s->cf_name(), fmt::ptr(t));
}
compaction::compaction_task_executor::~compaction_task_executor() {
compaction_task_executor::~compaction_task_executor() {
switch_state(state::none);
}
compaction::sstables_task_executor::~sstables_task_executor() {
sstables_task_executor::~sstables_task_executor() {
_cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending);
}
future<compaction_manager::compaction_stats_opt> compaction::compaction_task_executor::run() noexcept {
future<compaction_manager::compaction_stats_opt> compaction_task_executor::run() noexcept {
try {
_compaction_done = do_run();
return compaction_done();
@@ -594,7 +595,7 @@ future<compaction_manager::compaction_stats_opt> compaction::compaction_task_exe
}
}
compaction::compaction_task_executor::state compaction::compaction_task_executor::switch_state(state new_state) {
compaction_task_executor::state compaction_task_executor::switch_state(state new_state) {
auto old_state = std::exchange(_state, new_state);
switch (old_state) {
case state::none:
@@ -629,7 +630,7 @@ compaction::compaction_task_executor::state compaction::compaction_task_executor
return old_state;
}
void compaction::sstables_task_executor::set_sstables(std::vector<sstables::shared_sstable> new_sstables) {
void sstables_task_executor::set_sstables(std::vector<sstables::shared_sstable> new_sstables) {
if (!_sstables.empty()) {
on_internal_error(cmlog, format("sstables were already set"));
}
@@ -638,7 +639,7 @@ void compaction::sstables_task_executor::set_sstables(std::vector<sstables::shar
_cm._stats.pending_tasks += _sstables.size() - (_state == state::pending);
}
sstables::shared_sstable compaction::sstables_task_executor::consume_sstable() {
sstables::shared_sstable sstables_task_executor::consume_sstable() {
if (_sstables.empty()) {
on_internal_error(cmlog, format("no more sstables"));
}
@@ -649,7 +650,7 @@ sstables::shared_sstable compaction::sstables_task_executor::consume_sstable() {
return sst;
}
future<semaphore_units<named_semaphore_exception_factory>> compaction::compaction_task_executor::acquire_semaphore(named_semaphore& sem, size_t units) {
future<semaphore_units<named_semaphore_exception_factory>> compaction_task_executor::acquire_semaphore(named_semaphore& sem, size_t units) {
return seastar::get_units(sem, units, _compaction_data.abort).handle_exception_type([this] (const abort_requested_exception& e) {
auto s = _compacting_table->schema();
return make_exception_future<semaphore_units<named_semaphore_exception_factory>>(
@@ -657,13 +658,13 @@ future<semaphore_units<named_semaphore_exception_factory>> compaction::compactio
});
}
void compaction::compaction_task_executor::setup_new_compaction(sstables::run_id output_run_id) {
void compaction_task_executor::setup_new_compaction(sstables::run_id output_run_id) {
_compaction_data = _cm.create_compaction_data();
_output_run_identifier = output_run_id;
switch_state(state::active);
}
void compaction::compaction_task_executor::finish_compaction(state finish_state) noexcept {
void compaction_task_executor::finish_compaction(state finish_state) noexcept {
switch_state(finish_state);
_output_run_identifier = sstables::run_id::create_null_id();
if (finish_state != state::failed) {
@@ -672,17 +673,17 @@ void compaction::compaction_task_executor::finish_compaction(state finish_state)
_compaction_state.compaction_done.signal();
}
void compaction::compaction_task_executor::stop(sstring reason) noexcept {
void compaction_task_executor::stop(sstring reason) noexcept {
_compaction_data.stop(std::move(reason));
}
sstables::compaction_stopped_exception compaction::compaction_task_executor::make_compaction_stopped_exception() const {
sstables::compaction_stopped_exception compaction_task_executor::make_compaction_stopped_exception() const {
auto s = _compacting_table->schema();
return sstables::compaction_stopped_exception(s->ks_name(), s->cf_name(), _compaction_data.stop_requested);
}
compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task_manager& tm)
: _task_manager_module(make_shared<compaction::task_manager_module>(tm))
: _task_manager_module(make_shared<task_manager_module>(tm))
, _cfg(std::move(cfg))
, _compaction_controller(make_compaction_controller(compaction_sg(), static_shares(), [this] () -> float {
_last_backlog = backlog();
@@ -718,7 +719,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
}
compaction_manager::compaction_manager(tasks::task_manager& tm)
: _task_manager_module(make_shared<compaction::task_manager_module>(tm))
: _task_manager_module(make_shared<task_manager_module>(tm))
, _cfg(config{ .available_memory = 1 })
, _compaction_controller(make_compaction_controller(compaction_sg(), 1, [] () -> float { return 1.0; }))
, _backlog_manager(_compaction_controller)
@@ -803,7 +804,7 @@ future<> compaction_manager::postponed_compactions_reevaluation() {
auto postponed = std::exchange(_postponed, {});
try {
for (auto it = postponed.begin(); it != postponed.end();) {
compaction::table_state* t = *it;
table_state* t = *it;
it = postponed.erase(it);
// skip reevaluation of a table_state that became invalid post its removal
if (!_compaction_state.contains(t)) {
@@ -824,11 +825,11 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept {
_postponed_reevaluation.signal();
}
void compaction_manager::postpone_compaction_for_table(compaction::table_state* t) {
void compaction_manager::postpone_compaction_for_table(table_state* t) {
_postponed.insert(t);
}
future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction::compaction_task_executor>> tasks, sstring reason) {
future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_executor>> tasks, sstring reason) {
// To prevent compaction from being postponed while tasks are being stopped,
// let's stop all tasks before the deferring point below.
for (auto& t : tasks) {
@@ -849,10 +850,10 @@ future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction::compa
});
}
future<> compaction_manager::stop_ongoing_compactions(sstring reason, compaction::table_state* t, std::optional<sstables::compaction_type> type_opt) noexcept {
future<> compaction_manager::stop_ongoing_compactions(sstring reason, table_state* t, std::optional<sstables::compaction_type> type_opt) noexcept {
try {
auto ongoing_compactions = get_compactions(t).size();
auto tasks = boost::copy_range<std::vector<shared_ptr<compaction::compaction_task_executor>>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) {
auto tasks = boost::copy_range<std::vector<shared_ptr<compaction_task_executor>>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) {
return (!t || task->compacting_table() == t) && (!type_opt || task->type() == *type_opt);
}));
logging::log_level level = tasks.empty() ? log_level::debug : log_level::info;
@@ -922,11 +923,11 @@ void compaction_manager::do_stop() noexcept {
}
}
inline bool compaction_manager::can_proceed(compaction::table_state* t) const {
inline bool compaction_manager::can_proceed(table_state* t) const {
return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled();
}
inline bool compaction::compaction_task_executor::can_proceed(throw_if_stopping do_throw_if_stopping) const {
inline bool compaction_task_executor::can_proceed(throw_if_stopping do_throw_if_stopping) const {
if (stopping()) {
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
if (do_throw_if_stopping) {
@@ -937,7 +938,7 @@ inline bool compaction::compaction_task_executor::can_proceed(throw_if_stopping
return _cm.can_proceed(_compacting_table);
}
future<stop_iteration> compaction::compaction_task_executor::maybe_retry(std::exception_ptr err, bool throw_on_abort) {
future<stop_iteration> compaction_task_executor::maybe_retry(std::exception_ptr err, bool throw_on_abort) {
try {
std::rethrow_exception(err);
} catch (sstables::compaction_stopped_exception& e) {
@@ -974,7 +975,7 @@ namespace compaction {
class regular_compaction_task_executor : public compaction_task_executor {
public:
regular_compaction_task_executor(compaction_manager& mgr, compaction::table_state& t)
regular_compaction_task_executor(compaction_manager& mgr, table_state& t)
: compaction_task_executor(mgr, &t, sstables::compaction_type::Compaction, "Compaction")
{}
protected:
@@ -992,7 +993,7 @@ protected:
co_return std::nullopt;
}
compaction::table_state& t = *_compacting_table;
table_state& t = *_compacting_table;
sstables::compaction_strategy cs = t.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t, _cm.get_strategy_control(), _cm.get_candidates(t));
int weight = calculate_weight(descriptor);
@@ -1057,21 +1058,21 @@ protected:
}
void compaction_manager::submit(compaction::table_state& t) {
void compaction_manager::submit(table_state& t) {
if (_state != state::enabled || t.is_auto_compaction_disabled_by_user()) {
return;
}
// OK to drop future.
// waited via task->stop()
(void)perform_task(make_shared<compaction::regular_compaction_task_executor>(*this, t));
(void)perform_task(make_shared<regular_compaction_task_executor>(*this, t));
}
bool compaction_manager::can_perform_regular_compaction(compaction::table_state& t) {
bool compaction_manager::can_perform_regular_compaction(table_state& t) {
return can_proceed(&t) && !t.is_auto_compaction_disabled_by_user();
}
future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction::table_state& t) {
future<> compaction_manager::maybe_wait_for_sstable_count_reduction(table_state& t) {
auto schema = t.schema();
if (!can_perform_regular_compaction(t)) {
cmlog.trace("maybe_wait_for_sstable_count_reduction in {}.{}: cannot perform regular compaction",
@@ -1116,7 +1117,7 @@ namespace compaction {
class offstrategy_compaction_task_executor : public compaction_task_executor {
bool _performed = false;
public:
offstrategy_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t)
offstrategy_compaction_task_executor(compaction_manager& mgr, table_state* t)
: compaction_task_executor(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction")
{}
@@ -1136,7 +1137,7 @@ private:
// by the fact that off-strategy is serialized across all tables, meaning that the
// actual requirement is the size of the largest table's maintenance set.
compaction::table_state& t = *_compacting_table;
table_state& t = *_compacting_table;
const auto& maintenance_sstables = t.maintenance_sstable_set();
// Filter out sstables that require view building, to avoid a race between off-strategy
@@ -1232,7 +1233,7 @@ protected:
std::exception_ptr ex;
try {
compaction::table_state& t = *_compacting_table;
table_state& t = *_compacting_table;
auto maintenance_sstables = t.maintenance_sstable_set().all();
cmlog.info("Starting off-strategy compaction for {}.{}, {} candidates were found",
t.schema()->ks_name(), t.schema()->cf_name(), maintenance_sstables->size());
@@ -1256,11 +1257,11 @@ protected:
}
future<bool> compaction_manager::perform_offstrategy(compaction::table_state& t) {
future<bool> compaction_manager::perform_offstrategy(table_state& t) {
if (_state != state::enabled) {
co_return false;
}
auto task = make_shared<compaction::offstrategy_compaction_task_executor>(*this, &t);
auto task = make_shared<offstrategy_compaction_task_executor>(*this, &t);
co_await perform_task(task);
co_return task->performed();
}
@@ -1273,7 +1274,7 @@ class rewrite_sstables_compaction_task_executor : public sstables_task_executor
compaction_manager::can_purge_tombstones _can_purge;
public:
rewrite_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options,
rewrite_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options,
std::vector<sstables::shared_sstable> sstables, compacting_sstable_registration compacting,
compaction_manager::can_purge_tombstones can_purge)
: sstables_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables))
@@ -1343,8 +1344,8 @@ private:
}
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, compaction::compaction_task_executor>
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
requires std::derived_from<TaskType, compaction_task_executor>
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task_on_all_files(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
if (_state != state::enabled) {
co_return std::nullopt;
}
@@ -1371,15 +1372,15 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
co_return co_await perform_task(seastar::make_shared<TaskType>(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
}
future<compaction_manager::compaction_stats_opt> compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
return perform_task_on_all_files<compaction::rewrite_sstables_compaction_task_executor>(t, std::move(options), std::move(get_func), can_purge);
future<compaction_manager::compaction_stats_opt> compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
return perform_task_on_all_files<rewrite_sstables_compaction_task_executor>(t, std::move(options), std::move(get_func), can_purge);
}
namespace compaction {
class validate_sstables_compaction_task_executor : public sstables_task_executor {
public:
validate_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, std::vector<sstables::shared_sstable> sstables)
validate_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, std::vector<sstables::shared_sstable> sstables)
: sstables_task_executor(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables))
{}
@@ -1433,20 +1434,20 @@ private:
}
static std::vector<sstables::shared_sstable> get_all_sstables(compaction::table_state& t) {
static std::vector<sstables::shared_sstable> get_all_sstables(table_state& t) {
auto s = boost::copy_range<std::vector<sstables::shared_sstable>>(*t.main_sstable_set().all());
auto maintenance_set = t.maintenance_sstable_set().all();
s.insert(s.end(), maintenance_set->begin(), maintenance_set->end());
return s;
}
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub_validate_mode(table_state& t) {
if (_state != state::enabled) {
return make_ready_future<compaction_manager::compaction_stats_opt>();
}
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
auto all_sstables = get_all_sstables(t);
return perform_task(seastar::make_shared<compaction::validate_sstables_compaction_task_executor>(*this, &t, std::move(all_sstables)));
return perform_task(seastar::make_shared<validate_sstables_compaction_task_executor>(*this, &t, std::move(all_sstables)));
}
namespace compaction {
@@ -1456,7 +1457,7 @@ class cleanup_sstables_compaction_task_executor : public compaction_task_executo
compacting_sstable_registration _compacting;
std::vector<sstables::compaction_descriptor> _pending_cleanup_jobs;
public:
cleanup_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options,
cleanup_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options,
std::vector<sstables::shared_sstable> candidates, compacting_sstable_registration compacting)
: compaction_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
, _cleanup_options(std::move(options))
@@ -1545,7 +1546,7 @@ bool needs_cleanup(const sstables::shared_sstable& sst,
return true;
}
future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t) {
future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t) {
auto check_for_cleanup = [this, &t] {
return boost::algorithm::any_of(_tasks, [&t] (auto& task) {
return task->compacting_table() == &t && task->type() == sstables::compaction_type::Cleanup;
@@ -1569,12 +1570,12 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range
});
};
co_await perform_task_on_all_files<compaction::cleanup_sstables_compaction_task_executor>(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)),
co_await perform_task_on_all_files<cleanup_sstables_compaction_task_executor>(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)),
std::move(get_sstables));
}
// Submit a table to be upgraded and wait for its termination.
future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version) {
future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, table_state& t, bool exclude_current_version) {
auto get_sstables = [this, &t, exclude_current_version] {
std::vector<sstables::shared_sstable> tables;
@@ -1602,7 +1603,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own
}
// Submit a table to be scrubbed and wait for its termination.
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub(table_state& t, sstables::compaction_type_options::scrub opts) {
auto scrub_mode = opts.operation_mode;
if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) {
return perform_sstable_scrub_validate_mode(t);
@@ -1633,7 +1634,7 @@ compaction_manager::compaction_state::compaction_state(table_state& t)
{
}
void compaction_manager::add(compaction::table_state& t) {
void compaction_manager::add(table_state& t) {
auto [_, inserted] = _compaction_state.try_emplace(&t, t);
if (!inserted) {
auto s = t.schema();
@@ -1641,7 +1642,7 @@ void compaction_manager::add(compaction::table_state& t) {
}
}
future<> compaction_manager::remove(compaction::table_state& t) noexcept {
future<> compaction_manager::remove(table_state& t) noexcept {
auto& c_state = get_compaction_state(&t);
// We need to guarantee that a task being stopped will not retry to compact
@@ -1675,8 +1676,8 @@ future<> compaction_manager::remove(compaction::table_state& t) noexcept {
#endif
}
const std::vector<sstables::compaction_info> compaction_manager::get_compactions(compaction::table_state* t) const {
auto to_info = [] (const shared_ptr<compaction::compaction_task_executor>& task) {
const std::vector<sstables::compaction_info> compaction_manager::get_compactions(table_state* t) const {
auto to_info = [] (const shared_ptr<compaction_task_executor>& task) {
sstables::compaction_info ret;
ret.compaction_uuid = task->compaction_data().compaction_uuid;
ret.type = task->type();
@@ -1687,22 +1688,22 @@ const std::vector<sstables::compaction_info> compaction_manager::get_compactions
return ret;
};
using ret = std::vector<sstables::compaction_info>;
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const shared_ptr<compaction::compaction_task_executor>& task) {
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const shared_ptr<compaction_task_executor>& task) {
return (!t || task->compacting_table() == t) && task->compaction_running();
}) | boost::adaptors::transformed(to_info));
}
bool compaction_manager::has_table_ongoing_compaction(const compaction::table_state& t) const {
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr<compaction::compaction_task_executor>& task) {
bool compaction_manager::has_table_ongoing_compaction(const table_state& t) const {
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr<compaction_task_executor>& task) {
return task->compacting_table() == &t && task->compaction_running();
});
};
bool compaction_manager::compaction_disabled(compaction::table_state& t) const {
bool compaction_manager::compaction_disabled(table_state& t) const {
return _compaction_state.contains(&t) && _compaction_state.at(&t).compaction_disabled();
}
future<> compaction_manager::stop_compaction(sstring type, compaction::table_state* table) {
future<> compaction_manager::stop_compaction(sstring type, table_state* table) {
sstables::compaction_type target_type;
try {
target_type = sstables::to_compaction_type(type);
@@ -1721,7 +1722,7 @@ future<> compaction_manager::stop_compaction(sstring type, compaction::table_sta
return stop_ongoing_compactions("user request", table, target_type);
}
void compaction_manager::propagate_replacement(compaction::table_state& t,
void compaction_manager::propagate_replacement(table_state& t,
const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
for (auto& task : _tasks) {
if (task->compacting_table() == &t && task->compaction_running()) {
@@ -1736,7 +1737,7 @@ public:
explicit strategy_control(compaction_manager& cm) noexcept : _cm(cm) {}
bool has_ongoing_compaction(table_state& table_s) const noexcept override {
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr<compaction::compaction_task_executor>& task) {
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr<compaction_task_executor>& task) {
return task->compaction_running()
&& task->compacting_table()->schema()->ks_name() == s->ks_name()
&& task->compacting_table()->schema()->cf_name() == s->cf_name();
@@ -1744,7 +1745,7 @@ public:
}
};
compaction::strategy_control& compaction_manager::get_strategy_control() const noexcept {
strategy_control& compaction_manager::get_strategy_control() const noexcept {
return *_strategy_control;
}
@@ -1890,13 +1891,13 @@ compaction_backlog_manager::~compaction_backlog_manager() {
}
}
void compaction_manager::register_backlog_tracker(compaction::table_state& t, compaction_backlog_tracker new_backlog_tracker) {
void compaction_manager::register_backlog_tracker(table_state& t, compaction_backlog_tracker new_backlog_tracker) {
auto& cs = get_compaction_state(&t);
cs.backlog_tracker = std::move(new_backlog_tracker);
register_backlog_tracker(cs.backlog_tracker);
}
compaction_backlog_tracker& compaction_manager::get_backlog_tracker(compaction::table_state& t) {
compaction_backlog_tracker& compaction_manager::get_backlog_tracker(table_state& t) {
auto& cs = get_compaction_state(&t);
return cs.backlog_tracker;
}