compaction_manager: use coroutine::switch_to

Saving an allocation for running the functor
as a task in the switched-to scheduling group.

Also, switch to the desired scheduling group at
the beginning of the task so that the higher level logic,
like getting the list of sstables to compact
will be performed under the desired scheduling group,
not only the compaction code itself.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-02-22 20:54:06 +02:00
parent 8c66916652
commit 5e1fda7e1d

View File

@@ -14,6 +14,7 @@
#include "replica/database.hh"
#include <seastar/core/metrics.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/switch_to.hh>
#include "sstables/exceptions.hh"
#include "locator/abstract_replication_strategy.hh"
#include "utils/fb_utilities.hh"
@@ -293,6 +294,8 @@ protected:
// it cannot be the other way around, or minor compaction for this table would be
// prevented while an ongoing major compaction doesn't release the semaphore.
virtual future<> do_run() override {
co_await coroutine::switch_to(_cm._compaction_controller.sg());
switch_state(state::pending);
auto units = co_await get_units(_cm._maintenance_ops_sem, 1);
auto lock_holder = co_await _compaction_state.lock.hold_write_lock();
@@ -314,10 +317,9 @@ protected:
cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name());
compaction_backlog_tracker bt(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
_cm.register_backlog_tracker(bt);
// FIXME: co_await coroutine::switch_to
co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, t, descriptor = std::move(descriptor)] () mutable {
return t->compact_sstables(std::move(descriptor), _compaction_data);
});
co_await t->compact_sstables(std::move(descriptor), _compaction_data);
finish_compaction();
}
};
@@ -780,7 +782,9 @@ public:
protected:
virtual future<> do_run() override {
for (auto completed = stop_iteration::no; !completed; ) {
co_await coroutine::switch_to(_cm._compaction_controller.sg());
for (;;) {
if (!can_proceed()) {
co_return;
}
@@ -790,47 +794,47 @@ protected:
if (!can_proceed()) {
co_return;
}
// FIXME: co_await coroutine::switch_to
completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this] () mutable -> future<stop_iteration> {
replica::table& t = *_compacting_table;
sstables::compaction_strategy cs = t.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t));
int weight = calculate_weight(descriptor);
if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) {
cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user());
co_return stop_iteration::yes;
}
if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) {
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
finish_compaction(state::postponed);
_cm.postpone_compaction_for_table(&t);
co_return stop_iteration::yes;
}
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
auto weight_r = compaction_weight_registration(&_cm, weight);
descriptor.release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting.release_compacting(exhausted_sstables);
};
cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}",
fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
replica::table& t = *_compacting_table;
sstables::compaction_strategy cs = t.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t));
int weight = calculate_weight(descriptor);
setup_new_compaction(descriptor.run_identifier);
std::exception_ptr ex;
if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) {
cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user());
co_return;
}
if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) {
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
switch_state(state::postponed);
_cm.postpone_compaction_for_table(&t);
co_return;
}
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
auto weight_r = compaction_weight_registration(&_cm, weight);
descriptor.release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting.release_compacting(exhausted_sstables);
};
cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}",
fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
try {
co_await t.compact_sstables(std::move(descriptor), _compaction_data);
finish_compaction();
_cm.reevaluate_postponed_compactions();
co_return stop_iteration::no;
} catch (...) {
ex = std::current_exception();
}
setup_new_compaction(descriptor.run_identifier);
std::exception_ptr ex;
finish_compaction(state::failed);
co_return co_await maybe_retry(std::move(ex));
});
try {
co_await t.compact_sstables(std::move(descriptor), _compaction_data);
finish_compaction();
_cm.reevaluate_postponed_compactions();
continue;
} catch (...) {
ex = std::current_exception();
}
finish_compaction(state::failed);
if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) {
co_return;
}
}
}
};
@@ -853,6 +857,8 @@ public:
protected:
virtual future<> do_run() override {
co_await coroutine::switch_to(_cm._maintenance_sg.cpu);
for (;;) {
if (!can_proceed()) {
co_return;
@@ -866,9 +872,7 @@ protected:
std::exception_ptr ex;
try {
co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this] {
return _compacting_table->run_offstrategy_compaction(_compaction_data);
});
co_await _compacting_table->run_offstrategy_compaction(_compaction_data);
finish_compaction();
co_return;
} catch (...) {
@@ -916,8 +920,10 @@ protected:
private:
future<> rewrite_sstable(const sstables::shared_sstable& sst) {
switch_state(state::active);
for (auto completed = stop_iteration::no; !completed; ) {
co_await coroutine::switch_to(_cm._compaction_controller.sg());
for (;;) {
switch_state(state::active);
replica::table& t = *_compacting_table;
auto sstable_level = sst->get_sstable_level();
auto run_identifier = sst->run_identifier();
@@ -935,21 +941,22 @@ private:
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
_cm.register_backlog_tracker(user_initiated);
// FIXME: use coroutine::switch_to()
completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, descriptor = std::move(descriptor)] () mutable -> future<stop_iteration> {
std::exception_ptr ex;
try {
co_await _compacting_table->compact_sstables(std::move(descriptor), _compaction_data);
finish_compaction();
_cm.reevaluate_postponed_compactions();
co_return stop_iteration::yes; // done with current sstable
} catch (...) {
ex = std::current_exception();
}
finish_compaction(state::failed);
co_return co_await maybe_retry(std::move(ex)); // retry current sstable or rethrows exception
});
std::exception_ptr ex;
try {
co_await _compacting_table->compact_sstables(std::move(descriptor), _compaction_data);
finish_compaction();
_cm.reevaluate_postponed_compactions();
co_return; // done with current sstable
} catch (...) {
ex = std::current_exception();
}
finish_compaction(state::failed);
// retry current sstable or rethrows exception
if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) {
co_return;
}
}
}
};
@@ -993,20 +1000,20 @@ protected:
private:
future<> validate_sstable(const sstables::shared_sstable& sst) {
co_await coroutine::switch_to(_cm._maintenance_sg.cpu);
switch_state(state::active);
std::exception_ptr ex;
try {
// FIXME: co_await coroutine::switch_to
co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this, sst = std::move(sst)] () {
auto desc = sstables::compaction_descriptor(
{ sst },
{},
_cm._maintenance_sg.io,
sst->get_sstable_level(),
sstables::compaction_descriptor::default_max_sstable_bytes,
sst->run_identifier(),
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
return compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state());
});
auto desc = sstables::compaction_descriptor(
{ sst },
{},
_cm._maintenance_sg.io,
sst->get_sstable_level(),
sstables::compaction_descriptor::default_max_sstable_bytes,
sst->run_identifier(),
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
co_await compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state());
} catch (sstables::compaction_stopped_exception&) {
// ignore, will be handled by can_proceed()
} catch (storage_io_error& e) {