diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 7a6a262911..4fd4eca22d 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -14,6 +14,7 @@ #include "replica/database.hh" #include #include +#include #include "sstables/exceptions.hh" #include "locator/abstract_replication_strategy.hh" #include "utils/fb_utilities.hh" @@ -293,6 +294,8 @@ protected: // it cannot be the other way around, or minor compaction for this table would be // prevented while an ongoing major compaction doesn't release the semaphore. virtual future<> do_run() override { + co_await coroutine::switch_to(_cm._compaction_controller.sg()); + switch_state(state::pending); auto units = co_await get_units(_cm._maintenance_ops_sem, 1); auto lock_holder = co_await _compaction_state.lock.hold_write_lock(); @@ -314,10 +317,9 @@ protected: cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name()); compaction_backlog_tracker bt(std::make_unique(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory)); _cm.register_backlog_tracker(bt); - // FIXME: co_await coroutine::switch_to - co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, t, descriptor = std::move(descriptor)] () mutable { - return t->compact_sstables(std::move(descriptor), _compaction_data); - }); + + co_await t->compact_sstables(std::move(descriptor), _compaction_data); + finish_compaction(); } }; @@ -780,7 +782,9 @@ public: protected: virtual future<> do_run() override { - for (auto completed = stop_iteration::no; !completed; ) { + co_await coroutine::switch_to(_cm._compaction_controller.sg()); + + for (;;) { if (!can_proceed()) { co_return; } @@ -790,47 +794,47 @@ protected: if (!can_proceed()) { co_return; } - // FIXME: co_await coroutine::switch_to - completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this] () mutable -> future { - replica::table& t = *_compacting_table; - sstables::compaction_strategy cs = t.get_compaction_strategy(); - sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t)); - int weight = calculate_weight(descriptor); - if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) { - cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user()); - co_return stop_iteration::yes; - } - if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) { - cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", - descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); - finish_compaction(state::postponed); - _cm.postpone_compaction_for_table(&t); - co_return stop_iteration::yes; - } - auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); - auto weight_r = compaction_weight_registration(&_cm, weight); - descriptor.release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { - compacting.release_compacting(exhausted_sstables); - }; - cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}", - fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); + replica::table& t = *_compacting_table; + sstables::compaction_strategy cs = t.get_compaction_strategy(); + sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t)); + int weight = calculate_weight(descriptor); - setup_new_compaction(descriptor.run_identifier); - std::exception_ptr ex; + if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) { + cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user()); + co_return; + } + if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) { + cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", + descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); + switch_state(state::postponed); + _cm.postpone_compaction_for_table(&t); + co_return; + } + auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); + auto weight_r = compaction_weight_registration(&_cm, weight); + descriptor.release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { + compacting.release_compacting(exhausted_sstables); + }; + cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}", + fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); - try { - co_await t.compact_sstables(std::move(descriptor), _compaction_data); - finish_compaction(); - _cm.reevaluate_postponed_compactions(); - co_return stop_iteration::no; - } catch (...) { - ex = std::current_exception(); - } + setup_new_compaction(descriptor.run_identifier); + std::exception_ptr ex; - finish_compaction(state::failed); - co_return co_await maybe_retry(std::move(ex)); - }); + try { + co_await t.compact_sstables(std::move(descriptor), _compaction_data); + finish_compaction(); + _cm.reevaluate_postponed_compactions(); + continue; + } catch (...) { + ex = std::current_exception(); + } + + finish_compaction(state::failed); + if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) { + co_return; + } } } }; @@ -853,6 +857,8 @@ public: protected: virtual future<> do_run() override { + co_await coroutine::switch_to(_cm._maintenance_sg.cpu); + for (;;) { if (!can_proceed()) { co_return; @@ -866,9 +872,7 @@ protected: std::exception_ptr ex; try { - co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this] { - return _compacting_table->run_offstrategy_compaction(_compaction_data); - }); + co_await _compacting_table->run_offstrategy_compaction(_compaction_data); finish_compaction(); co_return; } catch (...) { @@ -916,8 +920,10 @@ protected: private: future<> rewrite_sstable(const sstables::shared_sstable& sst) { - switch_state(state::active); - for (auto completed = stop_iteration::no; !completed; ) { + co_await coroutine::switch_to(_cm._compaction_controller.sg()); + + for (;;) { + switch_state(state::active); replica::table& t = *_compacting_table; auto sstable_level = sst->get_sstable_level(); auto run_identifier = sst->run_identifier(); @@ -935,21 +941,22 @@ private: compaction_backlog_tracker user_initiated(std::make_unique(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory)); _cm.register_backlog_tracker(user_initiated); - // FIXME: use coroutine::switch_to() - completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, descriptor = std::move(descriptor)] () mutable -> future { - std::exception_ptr ex; - try { - co_await _compacting_table->compact_sstables(std::move(descriptor), _compaction_data); - finish_compaction(); - _cm.reevaluate_postponed_compactions(); - co_return stop_iteration::yes; // done with current sstable - } catch (...) { - ex = std::current_exception(); - } - finish_compaction(state::failed); - co_return co_await maybe_retry(std::move(ex)); // retry current sstable or rethrows exception - }); + std::exception_ptr ex; + try { + co_await _compacting_table->compact_sstables(std::move(descriptor), _compaction_data); + finish_compaction(); + _cm.reevaluate_postponed_compactions(); + co_return; // done with current sstable + } catch (...) { + ex = std::current_exception(); + } + + finish_compaction(state::failed); + // retry current sstable or rethrows exception + if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) { + co_return; + } } } }; @@ -993,20 +1000,20 @@ protected: private: future<> validate_sstable(const sstables::shared_sstable& sst) { + co_await coroutine::switch_to(_cm._maintenance_sg.cpu); + switch_state(state::active); + std::exception_ptr ex; try { - // FIXME: co_await coroutine::switch_to - co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this, sst = std::move(sst)] () { - auto desc = sstables::compaction_descriptor( - { sst }, - {}, - _cm._maintenance_sg.io, - sst->get_sstable_level(), - sstables::compaction_descriptor::default_max_sstable_bytes, - sst->run_identifier(), - sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate)); - return compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state()); - }); + auto desc = sstables::compaction_descriptor( + { sst }, + {}, + _cm._maintenance_sg.io, + sst->get_sstable_level(), + sstables::compaction_descriptor::default_max_sstable_bytes, + sst->run_identifier(), + sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate)); + co_await compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state()); } catch (sstables::compaction_stopped_exception&) { // ignore, will be handled by can_proceed() } catch (storage_io_error& e) {