compaction_manager: move per-type logic to derived task

Move the business logic into the task specific classes.
Separating initialization during task construction,
from the compaction_done task, moved into
a do_run() method, and in some cases moving
a lambda function that was called per table (as in
rewrite_sstables) into a private method of the
derived class.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-03-10 09:16:44 +02:00
parent 2e6ce43a97
commit a2a5e530f0
5 changed files with 414 additions and 345 deletions

View File

@@ -254,10 +254,32 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(r
}
}
shared_ptr<compaction_manager::task> compaction_manager::register_task(shared_ptr<compaction_manager::task> task) {
future<> compaction_manager::perform_task(shared_ptr<compaction_manager::task> task) {
_tasks.push_back(task);
auto unregister_task = defer([this, task] {
_tasks.remove(task);
});
cmlog.debug("{}: started", *task);
return task;
try {
co_await task->run();
cmlog.debug("{}: done", *task);
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("{}: stopped, reason: {}", *task, e.what());
} catch (sstables::compaction_aborted_exception& e) {
cmlog.error("{}: aborted, reason: {}", *task, e.what());
_stats.errors++;
throw;
} catch (storage_io_error& e) {
_stats.errors++;
cmlog.error("{}: failed due to storage io error: {}: stopping", *task, e.what());
do_stop();
throw;
} catch (...) {
cmlog.error("{}: failed, reason {}: stopping", *task, std::current_exception());
_stats.errors++;
throw;
}
}
class compaction_manager::major_compaction_task : public compaction_manager::task {
@@ -265,66 +287,76 @@ public:
major_compaction_task(compaction_manager& mgr, replica::table* t)
: task(mgr, t, sstables::compaction_type::Compaction, "Major compaction")
{}
protected:
// first take major compaction semaphore, then exclusely take compaction lock for table.
// it cannot be the other way around, or minor compaction for this table would be
// prevented while an ongoing major compaction doesn't release the semaphore.
virtual future<> do_run() override {
switch_state(state::pending);
auto units = co_await get_units(_cm._maintenance_ops_sem, 1);
auto lock_holder = co_await _compaction_state.lock.hold_write_lock();
if (!can_proceed()) {
co_return;
}
// candidates are sstables that aren't being operated on by other compaction types.
// those are eligible for major compaction.
auto* t = _compacting_table;
sstables::compaction_strategy cs = t->get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), _cm.get_candidates(*t));
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
descriptor.release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting.release_compacting(exhausted_sstables);
};
setup_new_compaction(descriptor.run_identifier);
cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name());
compaction_backlog_tracker bt(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
_cm.register_backlog_tracker(bt);
// FIXME: co_await coroutine::switch_to
co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, t, descriptor = std::move(descriptor)] () mutable {
return t->compact_sstables(std::move(descriptor), _compaction_data);
});
finish_compaction();
}
};
future<> compaction_manager::perform_major_compaction(replica::table* t) {
if (_state != state::enabled) {
return make_ready_future<>();
}
auto task = register_task(make_shared<major_compaction_task>(*this, t));
// first take major compaction semaphore, then exclusely take compaction lock for table.
// it cannot be the other way around, or minor compaction for this table would be
// prevented while an ongoing major compaction doesn't release the semaphore.
task->_compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, t] {
return with_lock(task->compaction_state().lock.for_write(), [this, task, t] {
_stats.active_tasks++;
if (!task->can_proceed()) {
return make_ready_future<>();
}
// candidates are sstables that aren't being operated on by other compaction types.
// those are eligible for major compaction.
sstables::compaction_strategy cs = t->get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), get_candidates(*t));
auto compacting = make_lw_shared<compacting_sstable_registration>(*this, descriptor.sstables);
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting->release_compacting(exhausted_sstables);
};
task->setup_new_compaction(descriptor.run_identifier);
cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name());
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
return do_with(std::move(user_initiated), [this, t, descriptor = std::move(descriptor), task] (compaction_backlog_tracker& bt) mutable {
register_backlog_tracker(bt);
return with_scheduling_group(_compaction_controller.sg(), [this, t, descriptor = std::move(descriptor), task] () mutable {
return t->compact_sstables(std::move(descriptor), task->compaction_data());
});
}).then([compacting = std::move(compacting)] {});
});
}).then_wrapped([this, task] (future<> f) {
_stats.active_tasks--;
_tasks.remove(task);
cmlog.debug("Major compaction {}: done", *task);
try {
f.get();
_stats.completed_tasks++;
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("major compaction stopped, reason: {}", e.what());
} catch (...) {
cmlog.error("major compaction failed, reason: {}", std::current_exception());
_stats.errors++;
}
});
return task->compaction_done().then([task] {});
return perform_task(make_shared<major_compaction_task>(*this, t));
}
class compaction_manager::custom_compaction_task : public compaction_manager::task {
noncopyable_function<future<>(sstables::compaction_data&)> _job;
public:
custom_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc)
custom_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc, noncopyable_function<future<>(sstables::compaction_data&)> job)
: task(mgr, t, type, std::move(desc))
, _job(std::move(job))
{}
protected:
virtual future<> do_run() override {
if (!can_proceed(throw_if_stopping::yes)) {
co_return;
}
switch_state(state::pending);
auto units = co_await get_units(_cm._maintenance_ops_sem, 1);
if (!can_proceed(throw_if_stopping::yes)) {
co_return;
}
setup_new_compaction();
// NOTE:
// no need to register shared sstables because they're excluded from non-resharding
// compaction and some of them may not even belong to current shard.
co_await _job(compaction_data());
finish_compaction();
}
};
future<> compaction_manager::run_custom_job(replica::table* t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job) {
@@ -332,40 +364,7 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
return make_ready_future<>();
}
auto task = register_task(make_shared<custom_compaction_task>(*this, t, type, desc));
auto job_ptr = std::make_unique<noncopyable_function<future<>(sstables::compaction_data&)>>(std::move(job));
task->_compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, &job = *job_ptr] () mutable {
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
if (task->stopping()) {
return make_exception_future<>(task->make_compaction_stopped_exception());
}
_stats.active_tasks++;
if (!task->can_proceed()) {
return make_ready_future<>();
}
task->setup_new_compaction();
// NOTE:
// no need to register shared sstables because they're excluded from non-resharding
// compaction and some of them may not even belong to current shard.
return job(task->compaction_data());
}).then_wrapped([this, task, job_ptr = std::move(job_ptr), type] (future<> f) {
_stats.active_tasks--;
_tasks.remove(task);
cmlog.debug("{} {}: done", type, *task);
try {
f.get();
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("{} was abruptly stopped, reason: {}", *task, e.what());
throw;
} catch (...) {
cmlog.error("{} failed: {}", *task, std::current_exception());
throw;
}
});
return task->compaction_done().then([task] {});
return perform_task(make_shared<custom_compaction_task>(*this, t, type, desc, std::move(job)));
}
future<>
@@ -426,6 +425,19 @@ compaction_manager::task::~task() {
switch_state(state::none);
}
compaction_manager::sstables_task::~sstables_task() {
_cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending);
}
future<> compaction_manager::task::run() noexcept {
try {
_compaction_done = do_run();
return compaction_done();
} catch (...) {
return current_exception_as_future();
}
}
compaction_manager::task::state compaction_manager::task::switch_state(state new_state) {
auto old_state = std::exchange(_state, new_state);
switch (old_state) {
@@ -461,15 +473,40 @@ compaction_manager::task::state compaction_manager::task::switch_state(state new
return old_state;
}
void compaction_manager::sstables_task::set_sstables(std::vector<sstables::shared_sstable> new_sstables) {
if (!_sstables.empty()) {
on_internal_error(cmlog, format("sstables were already set"));
}
_sstables = std::move(new_sstables);
cmlog.debug("{}: set_sstables: {} sstable{}", *this, _sstables.size(), _sstables.size() > 1 ? "s" : "");
_cm._stats.pending_tasks += _sstables.size() - (_state == state::pending);
}
sstables::shared_sstable compaction_manager::sstables_task::consume_sstable() {
if (_sstables.empty()) {
on_internal_error(cmlog, format("no more sstables"));
}
auto sst = _sstables.back();
_sstables.pop_back();
--_cm._stats.pending_tasks; // from this point on, switch_state(pending|active) works the same way as any other task
cmlog.debug("{}", format("consumed {}", sst->get_filename()));
return sst;
}
void compaction_manager::task::setup_new_compaction(utils::UUID output_run_id) {
_compaction_data = create_compaction_data();
_compaction_running = true;
_output_run_identifier = output_run_id;
switch_state(state::active);
}
void compaction_manager::task::finish_compaction() noexcept {
void compaction_manager::task::finish_compaction(state finish_state) noexcept {
switch_state(finish_state);
_compaction_running = false;
_output_run_identifier = utils::null_uuid();
if (finish_state != state::failed) {
_compaction_retry.reset();
}
}
void compaction_manager::task::stop(sstring reason) noexcept {
@@ -581,6 +618,8 @@ void compaction_manager::postponed_compactions_reevaluation() {
auto postponed = std::move(_postponed);
try {
for (auto& t : postponed) {
auto s = t->schema();
cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t));
submit(t);
}
} catch (...) {
@@ -694,9 +733,12 @@ inline bool compaction_manager::can_proceed(replica::table* t) const {
return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled();
}
inline bool compaction_manager::task::can_proceed() const {
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
inline bool compaction_manager::task::can_proceed(throw_if_stopping do_throw_if_stopping) const {
if (stopping()) {
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
if (do_throw_if_stopping) {
throw make_compaction_stopped_exception();
}
return false;
}
return _cm.can_proceed(_compacting_table);
@@ -720,7 +762,7 @@ future<stop_iteration> compaction_manager::task::maybe_retry(std::exception_ptr
_cm._stats.errors++;
cmlog.error("{}: failed: {}. Will retry in {} seconds", *this, std::current_exception(),
std::chrono::duration_cast<std::chrono::seconds>(_compaction_retry.sleep_time()).count());
_cm._stats.pending_tasks++;
switch_state(state::pending);
return _compaction_retry.retry(_compaction_data.abort).handle_exception_type([this] (sleep_aborted&) {
return make_exception_future<>(make_compaction_stopped_exception());
}).then([] {
@@ -737,70 +779,72 @@ public:
regular_compaction_task(compaction_manager& mgr, replica::table* t)
: task(mgr, t, sstables::compaction_type::Compaction, "Compaction")
{}
protected:
virtual future<> do_run() override {
for (auto completed = stop_iteration::no; !completed; ) {
if (!can_proceed()) {
co_return;
}
switch_state(state::pending);
// take read lock for table, so major and regular compaction can't proceed in parallel.
auto lock_holder = co_await _compaction_state.lock.hold_read_lock();
if (!can_proceed()) {
co_return;
}
// FIXME: co_await coroutine::switch_to
completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this] () mutable -> future<stop_iteration> {
replica::table& t = *_compacting_table;
sstables::compaction_strategy cs = t.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t));
int weight = calculate_weight(descriptor);
if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) {
cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user());
co_return stop_iteration::yes;
}
if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) {
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
finish_compaction(state::postponed);
_cm.postpone_compaction_for_table(&t);
co_return stop_iteration::yes;
}
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
auto weight_r = compaction_weight_registration(&_cm, weight);
descriptor.release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting.release_compacting(exhausted_sstables);
};
cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}",
fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
setup_new_compaction(descriptor.run_identifier);
std::exception_ptr ex;
try {
co_await t.compact_sstables(std::move(descriptor), _compaction_data);
finish_compaction();
_cm.reevaluate_postponed_compactions();
co_return stop_iteration::no;
} catch (...) {
ex = std::current_exception();
}
finish_compaction(state::failed);
co_return co_await maybe_retry(std::move(ex));
});
}
}
};
void compaction_manager::submit(replica::table* t) {
if (t->is_auto_compaction_disabled_by_user()) {
if (_state != state::enabled || t->is_auto_compaction_disabled_by_user()) {
return;
}
auto task = register_task(make_shared<regular_compaction_task>(*this, t));
_stats.pending_tasks++;
task->_compaction_done = repeat([this, task, t] () mutable {
if (!task->can_proceed()) {
_stats.pending_tasks--;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
// take read lock for table, so major and regular compaction can't proceed in parallel.
return with_lock(task->compaction_state().lock.for_read(), [this, task] () mutable {
return with_scheduling_group(_compaction_controller.sg(), [this, task = std::move(task)] () mutable {
replica::table& t = *task->_compacting_table;
sstables::compaction_strategy cs = t.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), get_strategy_control(), get_candidates(t));
int weight = calculate_weight(descriptor);
if (descriptor.sstables.empty() || !task->can_proceed() || t.is_auto_compaction_disabled_by_user()) {
_stats.pending_tasks--;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
if (!can_register_compaction(&t, weight, descriptor.fan_in())) {
_stats.pending_tasks--;
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
postpone_compaction_for_table(&t);
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto compacting = make_lw_shared<compacting_sstable_registration>(*this, descriptor.sstables);
auto weight_r = compaction_weight_registration(this, weight);
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting->release_compacting(exhausted_sstables);
};
cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}",
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
_stats.pending_tasks--;
_stats.active_tasks++;
task->setup_new_compaction(descriptor.run_identifier);
return t.compact_sstables(std::move(descriptor), task->compaction_data()).then_wrapped([this, task, compacting = std::move(compacting), weight_r = std::move(weight_r)] (future<> f) mutable {
_stats.active_tasks--;
task->finish_compaction();
if (f.failed()) {
return task->maybe_retry(f.get_exception());
}
_stats.pending_tasks++;
_stats.completed_tasks++;
task->_compaction_retry.reset();
reevaluate_postponed_compactions();
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
});
}).finally([this, task] {
_tasks.remove(task);
cmlog.debug("Compaction {}: done", *task);
});
// OK to drop future.
// waited via task->stop()
(void)perform_task(make_shared<regular_compaction_task>(*this, t));
}
class compaction_manager::offstrategy_compaction_task : public compaction_manager::task {
@@ -808,189 +852,187 @@ public:
offstrategy_compaction_task(compaction_manager& mgr, replica::table* t)
: task(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction")
{}
protected:
virtual future<> do_run() override {
for (;;) {
if (!can_proceed()) {
co_return;
}
switch_state(state::pending);
auto units = co_await get_units(_cm._maintenance_ops_sem, 1);
if (!can_proceed()) {
co_return;
}
setup_new_compaction();
std::exception_ptr ex;
try {
co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this] {
return _compacting_table->run_offstrategy_compaction(_compaction_data);
});
finish_compaction();
co_return;
} catch (...) {
ex = std::current_exception();
}
finish_compaction(state::failed);
if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) {
co_return;
}
}
}
};
future<> compaction_manager::perform_offstrategy(replica::table* t) {
auto task = register_task(make_shared<offstrategy_compaction_task>(*this, t));
_stats.pending_tasks++;
task->_compaction_done = repeat([this, task, t] () mutable {
if (!task->can_proceed()) {
_stats.pending_tasks--;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return with_semaphore(_maintenance_ops_sem, 1, [this, task, t] () mutable {
_stats.pending_tasks--;
if (!task->can_proceed()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
_stats.active_tasks++;
task->setup_new_compaction();
return with_scheduling_group(_maintenance_sg.cpu, [this, task, t] {
return t->run_offstrategy_compaction(task->compaction_data()).then_wrapped([this, task, schema = t->schema()] (future<> f) mutable {
_stats.active_tasks--;
task->finish_compaction();
if (!f.failed()) {
_stats.completed_tasks++;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return task->maybe_retry(f.get_exception());
});
});
});
}).finally([this, task] {
_tasks.remove(task);
cmlog.debug("Offstrategy compaction {}: done", *task);
});
return task->compaction_done().finally([task] {});
if (_state != state::enabled) {
return make_ready_future<>();
}
return perform_task(make_shared<offstrategy_compaction_task>(*this, t));
}
class compaction_manager::rewrite_sstables_compaction_task : public compaction_manager::task {
class compaction_manager::rewrite_sstables_compaction_task : public compaction_manager::sstables_task {
sstables::compaction_type_options _options;
compacting_sstable_registration _compacting;
can_purge_tombstones _can_purge;
public:
rewrite_sstables_compaction_task(compaction_manager& mgr, replica::table* t, const sstables::compaction_type_options& options)
: task(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
rewrite_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options, std::vector<sstables::shared_sstable> sstables, can_purge_tombstones can_purge)
: sstables_task(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables))
, _options(std::move(options))
, _compacting(mgr, _sstables)
, _can_purge(can_purge)
{}
protected:
virtual future<> do_run() override {
switch_state(state::pending);
auto maintenance_permit = co_await seastar::get_units(_cm._maintenance_ops_sem, 1);
while (!_sstables.empty() && can_proceed()) {
auto sst = consume_sstable();
co_await rewrite_sstable(std::move(sst));
}
}
private:
future<> rewrite_sstable(const sstables::shared_sstable& sst) {
switch_state(state::active);
for (auto completed = stop_iteration::no; !completed; ) {
replica::table& t = *_compacting_table;
auto sstable_level = sst->get_sstable_level();
auto run_identifier = sst->run_identifier();
auto sstable_set_snapshot = _can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
// FIXME: this compaction should run with maintenance priority.
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options);
// Releases reference to cleaned sstable such that respective used disk space can be freed.
descriptor.release_exhausted = [this] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
_compacting.release_compacting(exhausted_sstables);
};
setup_new_compaction(descriptor.run_identifier);
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
_cm.register_backlog_tracker(user_initiated);
// FIXME: use coroutine::switch_to()
completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, descriptor = std::move(descriptor)] () mutable -> future<stop_iteration> {
std::exception_ptr ex;
try {
co_await _compacting_table->compact_sstables(std::move(descriptor), _compaction_data);
finish_compaction();
_cm.reevaluate_postponed_compactions();
co_return stop_iteration::yes; // done with current sstable
} catch (...) {
ex = std::current_exception();
}
finish_compaction(state::failed);
co_return co_await maybe_retry(std::move(ex)); // retry current sstable or rethrows exception
});
}
}
};
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
std::vector<sstables::shared_sstable> sstables;
compacting_sstable_registration compacting(*this);
if (_state != state::enabled) {
co_return;
}
// since we might potentially have ongoing compactions, and we
// must ensure that all sstables created before we run are included
// in the re-write, we need to barrier out any previously running
// compaction.
co_await run_with_compaction_disabled(t, [&] () mutable -> future<> {
std::vector<sstables::shared_sstable> sstables;
co_await run_with_compaction_disabled(t, [this, &sstables, get_func = std::move(get_func)] () -> future<> {
sstables = co_await get_func();
compacting.register_compacting(sstables);
// sort sstables by size in descending order, such that the smallest files will be rewritten first
// (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher
// chance to succeed when the biggest files are reached.
std::sort(sstables.begin(), sstables.end(), [](sstables::shared_sstable& a, sstables::shared_sstable& b) {
return a->data_size() > b->data_size();
});
});
// sort sstables by size in descending order, such that the smallest files will be rewritten first
// (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher
// chance to succeed when the biggest files are reached.
std::sort(sstables.begin(), sstables.end(), [](sstables::shared_sstable& a, sstables::shared_sstable& b) {
return a->data_size() > b->data_size();
});
auto task = register_task(make_shared<rewrite_sstables_compaction_task>(*this, t, options));
_stats.pending_tasks += sstables.size();
auto task_completion = defer([this, &task, &sstables, &options] {
_stats.pending_tasks -= sstables.size();
_tasks.remove(task);
cmlog.debug("{} {}: done", options.type(), *task);
});
auto maintenance_permit = co_await seastar::get_units(_maintenance_ops_sem, 1);
auto rewrite_sstable = [this, &task, &options, &compacting, can_purge] (const sstables::shared_sstable& sst) mutable -> future<> {
stop_iteration completed = stop_iteration::no;
do {
replica::table& t = *task->_compacting_table;
auto sstable_level = sst->get_sstable_level();
auto run_identifier = sst->run_identifier();
auto sstable_set_snapshot = can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
// FIXME: this compaction should run with maintenance priority.
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
// Releases reference to cleaned sstable such that respective used disk space can be freed.
descriptor.release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting.release_compacting(exhausted_sstables);
};
_stats.pending_tasks--;
_stats.active_tasks++;
task->setup_new_compaction(descriptor.run_identifier);
auto perform_rewrite = [this, &t, &descriptor, &task] () mutable -> future<stop_iteration> {
std::exception_ptr ex;
try {
auto compaction_completion = defer([&task, this] {
task->finish_compaction();
_stats.active_tasks--;
});
co_await t.compact_sstables(std::move(descriptor), task->compaction_data());
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_return co_await task->maybe_retry(std::move(ex));
}
_stats.completed_tasks++;
reevaluate_postponed_compactions();
co_return stop_iteration::yes;
};
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
completed = co_await with_scheduling_group(_compaction_controller.sg(), std::ref(perform_rewrite));
} while (!completed);
};
shared_promise<> p;
task->_compaction_done = p.get_shared_future();
try {
while (!sstables.empty() && task->can_proceed()) {
auto sst = sstables.back();
sstables.pop_back();
co_await rewrite_sstable(sst);
}
p.set_value();
} catch (...) {
p.set_exception(std::current_exception());
throw;
}
co_await perform_task(seastar::make_shared<rewrite_sstables_compaction_task>(*this, t, std::move(options), std::move(sstables), can_purge));
}
class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task {
public:
validate_sstables_compaction_task(compaction_manager& mgr, replica::table* t, std::vector<sstables::shared_sstable> sstables)
: sstables_task(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables))
{}
protected:
virtual future<> do_run() override {
while (!_sstables.empty() && can_proceed()) {
auto sst = consume_sstable();
co_await validate_sstable(std::move(sst));
}
}
private:
future<> validate_sstable(const sstables::shared_sstable& sst) {
switch_state(state::active);
try {
// FIXME: co_await coroutine::switch_to
co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this, sst = std::move(sst)] () {
auto desc = sstables::compaction_descriptor(
{ sst },
{},
_cm._maintenance_sg.io,
sst->get_sstable_level(),
sstables::compaction_descriptor::default_max_sstable_bytes,
sst->run_identifier(),
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
return compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state());
});
} catch (sstables::compaction_stopped_exception&) {
// ignore, will be handled by can_proceed()
} catch (storage_io_error& e) {
cmlog.error("{}: failed due to storage io error: {}: stopping", *this, e.what());
_cm._stats.errors++;
_cm.do_stop();
throw;
} catch (...) {
// We are validating potentially corrupt sstables, errors are
// expected, just continue with the other sstables when seeing
// one.
_cm._stats.errors++;
cmlog.error("Scrubbing in validate mode {} failed due to {}, continuing.", sst->get_filename(), std::current_exception());
}
}
};
future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table* t) {
if (_state != state::enabled) {
return make_ready_future<>();
}
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
auto all_sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(*t->get_sstables());
return run_custom_job(t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", [this, &t = *t, sstables = std::move(all_sstables)] (sstables::compaction_data& info) mutable -> future<> {
class pending_tasks {
compaction_manager::stats& _stats;
size_t _n;
public:
pending_tasks(compaction_manager::stats& stats, size_t n) : _stats(stats), _n(n) { _stats.pending_tasks += _n; }
~pending_tasks() { _stats.pending_tasks -= _n; }
void operator--(int) {
--_stats.pending_tasks;
--_n;
}
};
pending_tasks pending(_stats, sstables.size());
while (!sstables.empty()) {
auto sst = sstables.back();
sstables.pop_back();
try {
co_await with_scheduling_group(_maintenance_sg.cpu, [&] () {
auto desc = sstables::compaction_descriptor(
{ sst },
{},
_maintenance_sg.io,
sst->get_sstable_level(),
sstables::compaction_descriptor::default_max_sstable_bytes,
sst->run_identifier(),
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
return compact_sstables(std::move(desc), info, t.as_table_state());
});
} catch (sstables::compaction_stopped_exception&) {
throw; // let run_custom_job() handle this
} catch (storage_io_error&) {
throw; // let run_custom_job() handle this
} catch (...) {
// We are validating potentially corrupt sstables, errors are
// expected, just continue with the other sstables when seeing
// one.
_stats.errors++;
cmlog.error("Scrubbing in validate mode {} failed due to {}, continuing.", sst->get_filename(), std::current_exception());
}
pending--;
}
});
return perform_task(seastar::make_shared<validate_sstables_compaction_task>(*this, t, std::move(all_sstables)));
}
bool needs_cleanup(const sstables::shared_sstable& sst,

View File

@@ -107,8 +107,6 @@ public:
gate::holder _gate_holder;
sstring _description;
// FIXME: for now
friend class compaction_manager;
public:
explicit task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc)
: _cm(mgr)
@@ -125,13 +123,17 @@ public:
virtual ~task();
protected:
virtual future<> do_run() = 0;
using throw_if_stopping = bool_class<struct throw_if_stopping_tag>;
state switch_state(state new_state);
// Return true if the task isn't stopped
// and the compaction manager allows proceeding.
inline bool can_proceed() const;
inline bool can_proceed(throw_if_stopping do_throw_if_stopping = throw_if_stopping::no) const;
void setup_new_compaction(utils::UUID output_run_id = utils::null_uuid());
void finish_compaction() noexcept;
void finish_compaction(state finish_state = state::done) noexcept;
// Compaction manager stop itself if it finds an storage I/O error which results in
// stop of transportation services. It cannot make progress anyway.
@@ -140,6 +142,8 @@ public:
future<stop_iteration> maybe_retry(std::exception_ptr err);
public:
future<> run() noexcept;
const replica::table* compacting_table() const noexcept {
return _compacting_table;
}
@@ -160,14 +164,6 @@ public:
return _compaction_data;
}
const compaction_manager::compaction_state& compaction_state() const noexcept {
return _compaction_state;
}
compaction_manager::compaction_state& compaction_state() noexcept {
return _compaction_state;
}
bool generating_output_run() const noexcept {
return _compaction_running && _output_run_identifier;
}
@@ -175,6 +171,10 @@ public:
return _output_run_identifier;
}
const sstring& description() const noexcept {
return _description;
}
future<> compaction_done() noexcept {
return _compaction_done.get_future();
}
@@ -190,11 +190,29 @@ public:
std::string describe() const;
};
class sstables_task : public task {
protected:
std::vector<sstables::shared_sstable> _sstables;
void set_sstables(std::vector<sstables::shared_sstable> new_sstables);
sstables::shared_sstable consume_sstable();
public:
explicit sstables_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type compaction_type, sstring desc, std::vector<sstables::shared_sstable> sstables)
: task(mgr, t, compaction_type, std::move(desc))
{
set_sstables(std::move(sstables));
}
virtual ~sstables_task();
};
class major_compaction_task;
class custom_compaction_task;
class regular_compaction_task;
class offstrategy_compaction_task;
class rewrite_sstables_compaction_task;
class validate_sstables_compaction_task;
class compaction_manager_test_task;
private:
@@ -255,7 +273,7 @@ private:
class strategy_control;
std::unique_ptr<strategy_control> _strategy_control;
private:
shared_ptr<task> register_task(shared_ptr<task>);
future<> perform_task(shared_ptr<task>);
future<> stop_tasks(std::vector<shared_ptr<task>> tasks, sstring reason);
@@ -417,4 +435,5 @@ public:
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s);
std::ostream& operator<<(std::ostream& os, compaction_manager::task::state s);
std::ostream& operator<<(std::ostream& os, const compaction_manager::task& task);

View File

@@ -3278,15 +3278,9 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
// register partial sstable run
auto cm_test = compaction_manager_test(*cm);
auto& cdata = cm_test.register_compaction(partial_sstable_run_identifier, cf.get());
auto deregister_compaction = defer([&] () noexcept {
cm_test.deregister_compaction(cdata);
});
cf->compact_all_sstables().get();
deregister_compaction.cancel();
cm_test.deregister_compaction(cdata);
cm_test.run(partial_sstable_run_identifier, cf.get(), [cf] (sstables::compaction_data&) {
return cf->compact_all_sstables();
}).get();
// make sure partial sstable run has none of its fragments compacted.
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
@@ -3676,7 +3670,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
// no compactions done yet
auto& ss = cm.get_stats();
BOOST_REQUIRE(ss.pending_tasks == 0 && ss.active_tasks == 0 && ss.completed_tasks == 0);
BOOST_REQUIRE(cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0 && ss.completed_tasks == 0);
// auto compaction is enabled by default
BOOST_REQUIRE(!cf->is_auto_compaction_disabled_by_user());
// disable auto compaction by user
@@ -3707,7 +3701,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
auto stop_cf = deferred_stop(*cf);
cf->trigger_compaction();
cf->get_compaction_manager().submit(cf.get());
BOOST_REQUIRE(ss.pending_tasks == 0 && ss.active_tasks == 0 && ss.completed_tasks == 0);
BOOST_REQUIRE(cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0 && ss.completed_tasks == 0);
// enable auto compaction
cf->enable_auto_compaction();
// check enabled
@@ -3715,7 +3709,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
// trigger background compaction
cf->trigger_compaction();
// wait until compaction finished
do_until([&ss] { return ss.pending_tasks == 0 && ss.active_tasks == 0; }, [] {
do_until([&cm] { return cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0; }, [] {
return sleep(std::chrono::milliseconds(100));
}).wait();
// test compaction successfully finished

View File

@@ -161,13 +161,14 @@ future<compaction_result> compact_sstables(sstables::compaction_descriptor descr
return creator();
};
descriptor.replacer = std::move(replacer);
auto& cm = cf.get_compaction_manager();
auto& cdata = compaction_manager_test(cm).register_compaction(descriptor.run_identifier, &cf);
return sstables::compact_sstables(std::move(descriptor), cdata, cf.as_table_state()).then([&cdata, &cm] (sstables::compaction_result res) {
return res;
}).finally([&cm, &cdata] {
compaction_manager_test(cm).deregister_compaction(cdata);
auto cmt = compaction_manager_test(cf.get_compaction_manager());
sstables::compaction_result ret;
co_await cmt.run(descriptor.run_identifier, &cf, [&] (sstables::compaction_data& cdata) {
return sstables::compact_sstables(std::move(descriptor), cdata, cf.as_table_state()).then([&] (sstables::compaction_result res) {
ret = std::move(res);
});
});
co_return ret;
}
std::vector<std::pair<sstring, dht::token>> token_generation_for_current_shard(unsigned tokens_to_generate) {
@@ -193,17 +194,28 @@ class compaction_manager::compaction_manager_test_task : public compaction_manag
noncopyable_function<future<> (sstables::compaction_data&)> _job;
public:
compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, utils::UUID run_id)
compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, utils::UUID run_id, noncopyable_function<future<> (sstables::compaction_data&)> job)
: compaction_manager::task(cm, cf, sstables::compaction_type::Compaction, "Test compaction")
, _run_id(run_id)
{
// FIXME: for now
, _job(std::move(job))
{ }
protected:
virtual future<> do_run() override {
setup_new_compaction(_run_id);
return _job(_compaction_data);
}
};
sstables::compaction_data& compaction_manager_test::register_compaction(utils::UUID output_run_id, replica::column_family* cf) {
auto task = make_shared<compaction_manager::compaction_manager_test_task>(_cm, cf, output_run_id);
future<> compaction_manager_test::run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job) {
auto task = make_shared<compaction_manager::compaction_manager_test_task>(_cm, cf, output_run_id, std::move(job));
auto& cdata = register_compaction(task);
return task->run().finally([this, &cdata] {
deregister_compaction(cdata);
});
}
sstables::compaction_data& compaction_manager_test::register_compaction(shared_ptr<compaction_manager::task> task) {
testlog.debug("compaction_manager_test: register_compaction uuid={}: {}", task->compaction_data().compaction_uuid, *task);
_cm._tasks.push_back(task);
return task->compaction_data();
@@ -216,6 +228,6 @@ void compaction_manager_test::deregister_compaction(const sstables::compaction_d
testlog.debug("compaction_manager_test: deregister_compaction uuid={}: {}", c.compaction_uuid, *task);
_cm._tasks.erase(it);
} else {
testlog.debug("compaction_manager_test: deregister_compaction uuid={}: task not found", c.compaction_uuid);
testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", c.compaction_uuid);
}
}

View File

@@ -345,7 +345,9 @@ class compaction_manager_test {
public:
explicit compaction_manager_test(compaction_manager& cm) noexcept : _cm(cm) {}
sstables::compaction_data& register_compaction(utils::UUID output_run_id, replica::column_family* cf);
future<> run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job);
private:
sstables::compaction_data& register_compaction(shared_ptr<compaction_manager::task> task);
void deregister_compaction(const sstables::compaction_data& c);
};