diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 40c63ab50b..741e3f7c2a 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -254,10 +254,32 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(r } } -shared_ptr compaction_manager::register_task(shared_ptr task) { +future<> compaction_manager::perform_task(shared_ptr task) { _tasks.push_back(task); + auto unregister_task = defer([this, task] { + _tasks.remove(task); + }); cmlog.debug("{}: started", *task); - return task; + + try { + co_await task->run(); + cmlog.debug("{}: done", *task); + } catch (sstables::compaction_stopped_exception& e) { + cmlog.info("{}: stopped, reason: {}", *task, e.what()); + } catch (sstables::compaction_aborted_exception& e) { + cmlog.error("{}: aborted, reason: {}", *task, e.what()); + _stats.errors++; + throw; + } catch (storage_io_error& e) { + _stats.errors++; + cmlog.error("{}: failed due to storage io error: {}: stopping", *task, e.what()); + do_stop(); + throw; + } catch (...) { + cmlog.error("{}: failed, reason {}: stopping", *task, std::current_exception()); + _stats.errors++; + throw; + } } class compaction_manager::major_compaction_task : public compaction_manager::task { @@ -265,66 +287,76 @@ public: major_compaction_task(compaction_manager& mgr, replica::table* t) : task(mgr, t, sstables::compaction_type::Compaction, "Major compaction") {} + +protected: + // first take major compaction semaphore, then exclusely take compaction lock for table. + // it cannot be the other way around, or minor compaction for this table would be + // prevented while an ongoing major compaction doesn't release the semaphore. + virtual future<> do_run() override { + switch_state(state::pending); + auto units = co_await get_units(_cm._maintenance_ops_sem, 1); + auto lock_holder = co_await _compaction_state.lock.hold_write_lock(); + if (!can_proceed()) { + co_return; + } + + // candidates are sstables that aren't being operated on by other compaction types. + // those are eligible for major compaction. + auto* t = _compacting_table; + sstables::compaction_strategy cs = t->get_compaction_strategy(); + sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), _cm.get_candidates(*t)); + auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); + descriptor.release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { + compacting.release_compacting(exhausted_sstables); + }; + setup_new_compaction(descriptor.run_identifier); + + cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name()); + compaction_backlog_tracker bt(std::make_unique(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory)); + _cm.register_backlog_tracker(bt); + // FIXME: co_await coroutine::switch_to + co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, t, descriptor = std::move(descriptor)] () mutable { + return t->compact_sstables(std::move(descriptor), _compaction_data); + }); + finish_compaction(); + } }; future<> compaction_manager::perform_major_compaction(replica::table* t) { if (_state != state::enabled) { return make_ready_future<>(); } - - auto task = register_task(make_shared(*this, t)); - - // first take major compaction semaphore, then exclusely take compaction lock for table. - // it cannot be the other way around, or minor compaction for this table would be - // prevented while an ongoing major compaction doesn't release the semaphore. - task->_compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, t] { - return with_lock(task->compaction_state().lock.for_write(), [this, task, t] { - _stats.active_tasks++; - if (!task->can_proceed()) { - return make_ready_future<>(); - } - - // candidates are sstables that aren't being operated on by other compaction types. - // those are eligible for major compaction. - sstables::compaction_strategy cs = t->get_compaction_strategy(); - sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), get_candidates(*t)); - auto compacting = make_lw_shared(*this, descriptor.sstables); - descriptor.release_exhausted = [compacting] (const std::vector& exhausted_sstables) { - compacting->release_compacting(exhausted_sstables); - }; - task->setup_new_compaction(descriptor.run_identifier); - - cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name()); - compaction_backlog_tracker user_initiated(std::make_unique(_compaction_controller.backlog_of_shares(200), _available_memory)); - return do_with(std::move(user_initiated), [this, t, descriptor = std::move(descriptor), task] (compaction_backlog_tracker& bt) mutable { - register_backlog_tracker(bt); - return with_scheduling_group(_compaction_controller.sg(), [this, t, descriptor = std::move(descriptor), task] () mutable { - return t->compact_sstables(std::move(descriptor), task->compaction_data()); - }); - }).then([compacting = std::move(compacting)] {}); - }); - }).then_wrapped([this, task] (future<> f) { - _stats.active_tasks--; - _tasks.remove(task); - cmlog.debug("Major compaction {}: done", *task); - try { - f.get(); - _stats.completed_tasks++; - } catch (sstables::compaction_stopped_exception& e) { - cmlog.info("major compaction stopped, reason: {}", e.what()); - } catch (...) { - cmlog.error("major compaction failed, reason: {}", std::current_exception()); - _stats.errors++; - } - }); - return task->compaction_done().then([task] {}); + return perform_task(make_shared(*this, t)); } class compaction_manager::custom_compaction_task : public compaction_manager::task { + noncopyable_function(sstables::compaction_data&)> _job; + public: - custom_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc) + custom_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc, noncopyable_function(sstables::compaction_data&)> job) : task(mgr, t, type, std::move(desc)) + , _job(std::move(job)) {} + +protected: + virtual future<> do_run() override { + if (!can_proceed(throw_if_stopping::yes)) { + co_return; + } + switch_state(state::pending); + auto units = co_await get_units(_cm._maintenance_ops_sem, 1); + + if (!can_proceed(throw_if_stopping::yes)) { + co_return; + } + setup_new_compaction(); + + // NOTE: + // no need to register shared sstables because they're excluded from non-resharding + // compaction and some of them may not even belong to current shard. + co_await _job(compaction_data()); + finish_compaction(); + } }; future<> compaction_manager::run_custom_job(replica::table* t, sstables::compaction_type type, const char* desc, noncopyable_function(sstables::compaction_data&)> job) { @@ -332,40 +364,7 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact return make_ready_future<>(); } - auto task = register_task(make_shared(*this, t, type, desc)); - - auto job_ptr = std::make_unique(sstables::compaction_data&)>>(std::move(job)); - - task->_compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, &job = *job_ptr] () mutable { - // Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run. - if (task->stopping()) { - return make_exception_future<>(task->make_compaction_stopped_exception()); - } - _stats.active_tasks++; - if (!task->can_proceed()) { - return make_ready_future<>(); - } - task->setup_new_compaction(); - - // NOTE: - // no need to register shared sstables because they're excluded from non-resharding - // compaction and some of them may not even belong to current shard. - return job(task->compaction_data()); - }).then_wrapped([this, task, job_ptr = std::move(job_ptr), type] (future<> f) { - _stats.active_tasks--; - _tasks.remove(task); - cmlog.debug("{} {}: done", type, *task); - try { - f.get(); - } catch (sstables::compaction_stopped_exception& e) { - cmlog.info("{} was abruptly stopped, reason: {}", *task, e.what()); - throw; - } catch (...) { - cmlog.error("{} failed: {}", *task, std::current_exception()); - throw; - } - }); - return task->compaction_done().then([task] {}); + return perform_task(make_shared(*this, t, type, desc, std::move(job))); } future<> @@ -426,6 +425,19 @@ compaction_manager::task::~task() { switch_state(state::none); } +compaction_manager::sstables_task::~sstables_task() { + _cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending); +} + +future<> compaction_manager::task::run() noexcept { + try { + _compaction_done = do_run(); + return compaction_done(); + } catch (...) { + return current_exception_as_future(); + } +} + compaction_manager::task::state compaction_manager::task::switch_state(state new_state) { auto old_state = std::exchange(_state, new_state); switch (old_state) { @@ -461,15 +473,40 @@ compaction_manager::task::state compaction_manager::task::switch_state(state new return old_state; } +void compaction_manager::sstables_task::set_sstables(std::vector new_sstables) { + if (!_sstables.empty()) { + on_internal_error(cmlog, format("sstables were already set")); + } + _sstables = std::move(new_sstables); + cmlog.debug("{}: set_sstables: {} sstable{}", *this, _sstables.size(), _sstables.size() > 1 ? "s" : ""); + _cm._stats.pending_tasks += _sstables.size() - (_state == state::pending); +} + +sstables::shared_sstable compaction_manager::sstables_task::consume_sstable() { + if (_sstables.empty()) { + on_internal_error(cmlog, format("no more sstables")); + } + auto sst = _sstables.back(); + _sstables.pop_back(); + --_cm._stats.pending_tasks; // from this point on, switch_state(pending|active) works the same way as any other task + cmlog.debug("{}", format("consumed {}", sst->get_filename())); + return sst; +} + void compaction_manager::task::setup_new_compaction(utils::UUID output_run_id) { _compaction_data = create_compaction_data(); _compaction_running = true; _output_run_identifier = output_run_id; + switch_state(state::active); } -void compaction_manager::task::finish_compaction() noexcept { +void compaction_manager::task::finish_compaction(state finish_state) noexcept { + switch_state(finish_state); _compaction_running = false; _output_run_identifier = utils::null_uuid(); + if (finish_state != state::failed) { + _compaction_retry.reset(); + } } void compaction_manager::task::stop(sstring reason) noexcept { @@ -581,6 +618,8 @@ void compaction_manager::postponed_compactions_reevaluation() { auto postponed = std::move(_postponed); try { for (auto& t : postponed) { + auto s = t->schema(); + cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t)); submit(t); } } catch (...) { @@ -694,9 +733,12 @@ inline bool compaction_manager::can_proceed(replica::table* t) const { return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled(); } -inline bool compaction_manager::task::can_proceed() const { - // Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run. +inline bool compaction_manager::task::can_proceed(throw_if_stopping do_throw_if_stopping) const { if (stopping()) { + // Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run. + if (do_throw_if_stopping) { + throw make_compaction_stopped_exception(); + } return false; } return _cm.can_proceed(_compacting_table); @@ -720,7 +762,7 @@ future compaction_manager::task::maybe_retry(std::exception_ptr _cm._stats.errors++; cmlog.error("{}: failed: {}. Will retry in {} seconds", *this, std::current_exception(), std::chrono::duration_cast(_compaction_retry.sleep_time()).count()); - _cm._stats.pending_tasks++; + switch_state(state::pending); return _compaction_retry.retry(_compaction_data.abort).handle_exception_type([this] (sleep_aborted&) { return make_exception_future<>(make_compaction_stopped_exception()); }).then([] { @@ -737,70 +779,72 @@ public: regular_compaction_task(compaction_manager& mgr, replica::table* t) : task(mgr, t, sstables::compaction_type::Compaction, "Compaction") {} + +protected: + virtual future<> do_run() override { + for (auto completed = stop_iteration::no; !completed; ) { + if (!can_proceed()) { + co_return; + } + switch_state(state::pending); + // take read lock for table, so major and regular compaction can't proceed in parallel. + auto lock_holder = co_await _compaction_state.lock.hold_read_lock(); + if (!can_proceed()) { + co_return; + } + // FIXME: co_await coroutine::switch_to + completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this] () mutable -> future { + replica::table& t = *_compacting_table; + sstables::compaction_strategy cs = t.get_compaction_strategy(); + sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t)); + int weight = calculate_weight(descriptor); + + if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) { + cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user()); + co_return stop_iteration::yes; + } + if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) { + cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", + descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); + finish_compaction(state::postponed); + _cm.postpone_compaction_for_table(&t); + co_return stop_iteration::yes; + } + auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); + auto weight_r = compaction_weight_registration(&_cm, weight); + descriptor.release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { + compacting.release_compacting(exhausted_sstables); + }; + cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}", + fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); + + setup_new_compaction(descriptor.run_identifier); + std::exception_ptr ex; + + try { + co_await t.compact_sstables(std::move(descriptor), _compaction_data); + finish_compaction(); + _cm.reevaluate_postponed_compactions(); + co_return stop_iteration::no; + } catch (...) { + ex = std::current_exception(); + } + + finish_compaction(state::failed); + co_return co_await maybe_retry(std::move(ex)); + }); + } + } }; void compaction_manager::submit(replica::table* t) { - if (t->is_auto_compaction_disabled_by_user()) { + if (_state != state::enabled || t->is_auto_compaction_disabled_by_user()) { return; } - auto task = register_task(make_shared(*this, t)); - _stats.pending_tasks++; - - task->_compaction_done = repeat([this, task, t] () mutable { - if (!task->can_proceed()) { - _stats.pending_tasks--; - return make_ready_future(stop_iteration::yes); - } - // take read lock for table, so major and regular compaction can't proceed in parallel. - return with_lock(task->compaction_state().lock.for_read(), [this, task] () mutable { - return with_scheduling_group(_compaction_controller.sg(), [this, task = std::move(task)] () mutable { - replica::table& t = *task->_compacting_table; - sstables::compaction_strategy cs = t.get_compaction_strategy(); - sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), get_strategy_control(), get_candidates(t)); - int weight = calculate_weight(descriptor); - - if (descriptor.sstables.empty() || !task->can_proceed() || t.is_auto_compaction_disabled_by_user()) { - _stats.pending_tasks--; - return make_ready_future(stop_iteration::yes); - } - if (!can_register_compaction(&t, weight, descriptor.fan_in())) { - _stats.pending_tasks--; - cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", - descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); - postpone_compaction_for_table(&t); - return make_ready_future(stop_iteration::yes); - } - auto compacting = make_lw_shared(*this, descriptor.sstables); - auto weight_r = compaction_weight_registration(this, weight); - descriptor.release_exhausted = [compacting] (const std::vector& exhausted_sstables) { - compacting->release_compacting(exhausted_sstables); - }; - cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}", - descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); - - _stats.pending_tasks--; - _stats.active_tasks++; - task->setup_new_compaction(descriptor.run_identifier); - return t.compact_sstables(std::move(descriptor), task->compaction_data()).then_wrapped([this, task, compacting = std::move(compacting), weight_r = std::move(weight_r)] (future<> f) mutable { - _stats.active_tasks--; - task->finish_compaction(); - - if (f.failed()) { - return task->maybe_retry(f.get_exception()); - } - _stats.pending_tasks++; - _stats.completed_tasks++; - task->_compaction_retry.reset(); - reevaluate_postponed_compactions(); - return make_ready_future(stop_iteration::no); - }); - }); - }); - }).finally([this, task] { - _tasks.remove(task); - cmlog.debug("Compaction {}: done", *task); - }); + // OK to drop future. + // waited via task->stop() + (void)perform_task(make_shared(*this, t)); } class compaction_manager::offstrategy_compaction_task : public compaction_manager::task { @@ -808,189 +852,187 @@ public: offstrategy_compaction_task(compaction_manager& mgr, replica::table* t) : task(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction") {} + +protected: + virtual future<> do_run() override { + for (;;) { + if (!can_proceed()) { + co_return; + } + switch_state(state::pending); + auto units = co_await get_units(_cm._maintenance_ops_sem, 1); + if (!can_proceed()) { + co_return; + } + setup_new_compaction(); + + std::exception_ptr ex; + try { + co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this] { + return _compacting_table->run_offstrategy_compaction(_compaction_data); + }); + finish_compaction(); + co_return; + } catch (...) { + ex = std::current_exception(); + } + + finish_compaction(state::failed); + if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) { + co_return; + } + } + } }; future<> compaction_manager::perform_offstrategy(replica::table* t) { - auto task = register_task(make_shared(*this, t)); - _stats.pending_tasks++; - - task->_compaction_done = repeat([this, task, t] () mutable { - if (!task->can_proceed()) { - _stats.pending_tasks--; - return make_ready_future(stop_iteration::yes); - } - return with_semaphore(_maintenance_ops_sem, 1, [this, task, t] () mutable { - _stats.pending_tasks--; - if (!task->can_proceed()) { - return make_ready_future(stop_iteration::yes); - } - _stats.active_tasks++; - task->setup_new_compaction(); - - return with_scheduling_group(_maintenance_sg.cpu, [this, task, t] { - return t->run_offstrategy_compaction(task->compaction_data()).then_wrapped([this, task, schema = t->schema()] (future<> f) mutable { - _stats.active_tasks--; - task->finish_compaction(); - if (!f.failed()) { - _stats.completed_tasks++; - return make_ready_future(stop_iteration::yes); - } - return task->maybe_retry(f.get_exception()); - }); - }); - }); - }).finally([this, task] { - _tasks.remove(task); - cmlog.debug("Offstrategy compaction {}: done", *task); - }); - return task->compaction_done().finally([task] {}); + if (_state != state::enabled) { + return make_ready_future<>(); + } + return perform_task(make_shared(*this, t)); } -class compaction_manager::rewrite_sstables_compaction_task : public compaction_manager::task { +class compaction_manager::rewrite_sstables_compaction_task : public compaction_manager::sstables_task { + sstables::compaction_type_options _options; + compacting_sstable_registration _compacting; + can_purge_tombstones _can_purge; + public: - rewrite_sstables_compaction_task(compaction_manager& mgr, replica::table* t, const sstables::compaction_type_options& options) - : task(mgr, t, options.type(), sstring(sstables::to_string(options.type()))) + rewrite_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options, std::vector sstables, can_purge_tombstones can_purge) + : sstables_task(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables)) + , _options(std::move(options)) + , _compacting(mgr, _sstables) + , _can_purge(can_purge) {} + +protected: + virtual future<> do_run() override { + switch_state(state::pending); + auto maintenance_permit = co_await seastar::get_units(_cm._maintenance_ops_sem, 1); + + while (!_sstables.empty() && can_proceed()) { + auto sst = consume_sstable(); + co_await rewrite_sstable(std::move(sst)); + } + } + +private: + future<> rewrite_sstable(const sstables::shared_sstable& sst) { + switch_state(state::active); + for (auto completed = stop_iteration::no; !completed; ) { + replica::table& t = *_compacting_table; + auto sstable_level = sst->get_sstable_level(); + auto run_identifier = sst->run_identifier(); + auto sstable_set_snapshot = _can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt; + // FIXME: this compaction should run with maintenance priority. + auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(), + sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options); + + // Releases reference to cleaned sstable such that respective used disk space can be freed. + descriptor.release_exhausted = [this] (const std::vector& exhausted_sstables) { + _compacting.release_compacting(exhausted_sstables); + }; + + setup_new_compaction(descriptor.run_identifier); + + compaction_backlog_tracker user_initiated(std::make_unique(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory)); + _cm.register_backlog_tracker(user_initiated); + // FIXME: use coroutine::switch_to() + completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, descriptor = std::move(descriptor)] () mutable -> future { + std::exception_ptr ex; + try { + co_await _compacting_table->compact_sstables(std::move(descriptor), _compaction_data); + finish_compaction(); + _cm.reevaluate_postponed_compactions(); + co_return stop_iteration::yes; // done with current sstable + } catch (...) { + ex = std::current_exception(); + } + + finish_compaction(state::failed); + co_return co_await maybe_retry(std::move(ex)); // retry current sstable or rethrows exception + }); + } + } }; future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { - std::vector sstables; - compacting_sstable_registration compacting(*this); + if (_state != state::enabled) { + co_return; + } // since we might potentially have ongoing compactions, and we // must ensure that all sstables created before we run are included // in the re-write, we need to barrier out any previously running // compaction. - co_await run_with_compaction_disabled(t, [&] () mutable -> future<> { + std::vector sstables; + co_await run_with_compaction_disabled(t, [this, &sstables, get_func = std::move(get_func)] () -> future<> { sstables = co_await get_func(); - compacting.register_compacting(sstables); + + // sort sstables by size in descending order, such that the smallest files will be rewritten first + // (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher + // chance to succeed when the biggest files are reached. + std::sort(sstables.begin(), sstables.end(), [](sstables::shared_sstable& a, sstables::shared_sstable& b) { + return a->data_size() > b->data_size(); + }); }); - // sort sstables by size in descending order, such that the smallest files will be rewritten first - // (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher - // chance to succeed when the biggest files are reached. - std::sort(sstables.begin(), sstables.end(), [](sstables::shared_sstable& a, sstables::shared_sstable& b) { - return a->data_size() > b->data_size(); - }); - - auto task = register_task(make_shared(*this, t, options)); - - _stats.pending_tasks += sstables.size(); - - auto task_completion = defer([this, &task, &sstables, &options] { - _stats.pending_tasks -= sstables.size(); - _tasks.remove(task); - cmlog.debug("{} {}: done", options.type(), *task); - }); - - auto maintenance_permit = co_await seastar::get_units(_maintenance_ops_sem, 1); - - auto rewrite_sstable = [this, &task, &options, &compacting, can_purge] (const sstables::shared_sstable& sst) mutable -> future<> { - stop_iteration completed = stop_iteration::no; - do { - replica::table& t = *task->_compacting_table; - auto sstable_level = sst->get_sstable_level(); - auto run_identifier = sst->run_identifier(); - auto sstable_set_snapshot = can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt; - // FIXME: this compaction should run with maintenance priority. - auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(), - sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options); - - // Releases reference to cleaned sstable such that respective used disk space can be freed. - descriptor.release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { - compacting.release_compacting(exhausted_sstables); - }; - - _stats.pending_tasks--; - _stats.active_tasks++; - task->setup_new_compaction(descriptor.run_identifier); - - auto perform_rewrite = [this, &t, &descriptor, &task] () mutable -> future { - std::exception_ptr ex; - try { - auto compaction_completion = defer([&task, this] { - task->finish_compaction(); - _stats.active_tasks--; - }); - co_await t.compact_sstables(std::move(descriptor), task->compaction_data()); - } catch (...) { - ex = std::current_exception(); - } - if (ex) { - co_return co_await task->maybe_retry(std::move(ex)); - } - _stats.completed_tasks++; - reevaluate_postponed_compactions(); - co_return stop_iteration::yes; - }; - - compaction_backlog_tracker user_initiated(std::make_unique(_compaction_controller.backlog_of_shares(200), _available_memory)); - completed = co_await with_scheduling_group(_compaction_controller.sg(), std::ref(perform_rewrite)); - } while (!completed); - }; - - shared_promise<> p; - task->_compaction_done = p.get_shared_future(); - try { - while (!sstables.empty() && task->can_proceed()) { - auto sst = sstables.back(); - sstables.pop_back(); - co_await rewrite_sstable(sst); - } - p.set_value(); - } catch (...) { - p.set_exception(std::current_exception()); - throw; - } + co_await perform_task(seastar::make_shared(*this, t, std::move(options), std::move(sstables), can_purge)); } +class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task { +public: + validate_sstables_compaction_task(compaction_manager& mgr, replica::table* t, std::vector sstables) + : sstables_task(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables)) + {} + +protected: + virtual future<> do_run() override { + while (!_sstables.empty() && can_proceed()) { + auto sst = consume_sstable(); + co_await validate_sstable(std::move(sst)); + } + } + +private: + future<> validate_sstable(const sstables::shared_sstable& sst) { + switch_state(state::active); + try { + // FIXME: co_await coroutine::switch_to + co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this, sst = std::move(sst)] () { + auto desc = sstables::compaction_descriptor( + { sst }, + {}, + _cm._maintenance_sg.io, + sst->get_sstable_level(), + sstables::compaction_descriptor::default_max_sstable_bytes, + sst->run_identifier(), + sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate)); + return compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state()); + }); + } catch (sstables::compaction_stopped_exception&) { + // ignore, will be handled by can_proceed() + } catch (storage_io_error& e) { + cmlog.error("{}: failed due to storage io error: {}: stopping", *this, e.what()); + _cm._stats.errors++; + _cm.do_stop(); + throw; + } catch (...) { + // We are validating potentially corrupt sstables, errors are + // expected, just continue with the other sstables when seeing + // one. + _cm._stats.errors++; + cmlog.error("Scrubbing in validate mode {} failed due to {}, continuing.", sst->get_filename(), std::current_exception()); + } + } +}; + future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table* t) { + if (_state != state::enabled) { + return make_ready_future<>(); + } // All sstables must be included, even the ones being compacted, such that everything in table is validated. auto all_sstables = boost::copy_range>(*t->get_sstables()); - return run_custom_job(t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", [this, &t = *t, sstables = std::move(all_sstables)] (sstables::compaction_data& info) mutable -> future<> { - class pending_tasks { - compaction_manager::stats& _stats; - size_t _n; - public: - pending_tasks(compaction_manager::stats& stats, size_t n) : _stats(stats), _n(n) { _stats.pending_tasks += _n; } - ~pending_tasks() { _stats.pending_tasks -= _n; } - void operator--(int) { - --_stats.pending_tasks; - --_n; - } - }; - pending_tasks pending(_stats, sstables.size()); - - while (!sstables.empty()) { - auto sst = sstables.back(); - sstables.pop_back(); - - try { - co_await with_scheduling_group(_maintenance_sg.cpu, [&] () { - auto desc = sstables::compaction_descriptor( - { sst }, - {}, - _maintenance_sg.io, - sst->get_sstable_level(), - sstables::compaction_descriptor::default_max_sstable_bytes, - sst->run_identifier(), - sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate)); - return compact_sstables(std::move(desc), info, t.as_table_state()); - }); - } catch (sstables::compaction_stopped_exception&) { - throw; // let run_custom_job() handle this - } catch (storage_io_error&) { - throw; // let run_custom_job() handle this - } catch (...) { - // We are validating potentially corrupt sstables, errors are - // expected, just continue with the other sstables when seeing - // one. - _stats.errors++; - cmlog.error("Scrubbing in validate mode {} failed due to {}, continuing.", sst->get_filename(), std::current_exception()); - } - - pending--; - } - }); + return perform_task(seastar::make_shared(*this, t, std::move(all_sstables))); } bool needs_cleanup(const sstables::shared_sstable& sst, diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 30859fd014..d9ad0041a3 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -107,8 +107,6 @@ public: gate::holder _gate_holder; sstring _description; - // FIXME: for now - friend class compaction_manager; public: explicit task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc) : _cm(mgr) @@ -125,13 +123,17 @@ public: virtual ~task(); protected: + virtual future<> do_run() = 0; + + using throw_if_stopping = bool_class; + state switch_state(state new_state); // Return true if the task isn't stopped // and the compaction manager allows proceeding. - inline bool can_proceed() const; + inline bool can_proceed(throw_if_stopping do_throw_if_stopping = throw_if_stopping::no) const; void setup_new_compaction(utils::UUID output_run_id = utils::null_uuid()); - void finish_compaction() noexcept; + void finish_compaction(state finish_state = state::done) noexcept; // Compaction manager stop itself if it finds an storage I/O error which results in // stop of transportation services. It cannot make progress anyway. @@ -140,6 +142,8 @@ public: future maybe_retry(std::exception_ptr err); public: + future<> run() noexcept; + const replica::table* compacting_table() const noexcept { return _compacting_table; } @@ -160,14 +164,6 @@ public: return _compaction_data; } - const compaction_manager::compaction_state& compaction_state() const noexcept { - return _compaction_state; - } - - compaction_manager::compaction_state& compaction_state() noexcept { - return _compaction_state; - } - bool generating_output_run() const noexcept { return _compaction_running && _output_run_identifier; } @@ -175,6 +171,10 @@ public: return _output_run_identifier; } + const sstring& description() const noexcept { + return _description; + } + future<> compaction_done() noexcept { return _compaction_done.get_future(); } @@ -190,11 +190,29 @@ public: std::string describe() const; }; + class sstables_task : public task { + protected: + std::vector _sstables; + + void set_sstables(std::vector new_sstables); + sstables::shared_sstable consume_sstable(); + + public: + explicit sstables_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type compaction_type, sstring desc, std::vector sstables) + : task(mgr, t, compaction_type, std::move(desc)) + { + set_sstables(std::move(sstables)); + } + + virtual ~sstables_task(); + }; + class major_compaction_task; class custom_compaction_task; class regular_compaction_task; class offstrategy_compaction_task; class rewrite_sstables_compaction_task; + class validate_sstables_compaction_task; class compaction_manager_test_task; private: @@ -255,7 +273,7 @@ private: class strategy_control; std::unique_ptr _strategy_control; private: - shared_ptr register_task(shared_ptr); + future<> perform_task(shared_ptr); future<> stop_tasks(std::vector> tasks, sstring reason); @@ -417,4 +435,5 @@ public: bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s); +std::ostream& operator<<(std::ostream& os, compaction_manager::task::state s); std::ostream& operator<<(std::ostream& os, const compaction_manager::task& task); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index f28c5157b0..b5dd1db9fa 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3278,15 +3278,9 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) { // register partial sstable run auto cm_test = compaction_manager_test(*cm); - auto& cdata = cm_test.register_compaction(partial_sstable_run_identifier, cf.get()); - auto deregister_compaction = defer([&] () noexcept { - cm_test.deregister_compaction(cdata); - }); - - cf->compact_all_sstables().get(); - - deregister_compaction.cancel(); - cm_test.deregister_compaction(cdata); + cm_test.run(partial_sstable_run_identifier, cf.get(), [cf] (sstables::compaction_data&) { + return cf->compact_all_sstables(); + }).get(); // make sure partial sstable run has none of its fragments compacted. BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation())); @@ -3676,7 +3670,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) { // no compactions done yet auto& ss = cm.get_stats(); - BOOST_REQUIRE(ss.pending_tasks == 0 && ss.active_tasks == 0 && ss.completed_tasks == 0); + BOOST_REQUIRE(cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0 && ss.completed_tasks == 0); // auto compaction is enabled by default BOOST_REQUIRE(!cf->is_auto_compaction_disabled_by_user()); // disable auto compaction by user @@ -3707,7 +3701,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) { auto stop_cf = deferred_stop(*cf); cf->trigger_compaction(); cf->get_compaction_manager().submit(cf.get()); - BOOST_REQUIRE(ss.pending_tasks == 0 && ss.active_tasks == 0 && ss.completed_tasks == 0); + BOOST_REQUIRE(cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0 && ss.completed_tasks == 0); // enable auto compaction cf->enable_auto_compaction(); // check enabled @@ -3715,7 +3709,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) { // trigger background compaction cf->trigger_compaction(); // wait until compaction finished - do_until([&ss] { return ss.pending_tasks == 0 && ss.active_tasks == 0; }, [] { + do_until([&cm] { return cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0; }, [] { return sleep(std::chrono::milliseconds(100)); }).wait(); // test compaction successfully finished diff --git a/test/lib/sstable_utils.cc b/test/lib/sstable_utils.cc index 401ff45255..7f90020306 100644 --- a/test/lib/sstable_utils.cc +++ b/test/lib/sstable_utils.cc @@ -161,13 +161,14 @@ future compact_sstables(sstables::compaction_descriptor descr return creator(); }; descriptor.replacer = std::move(replacer); - auto& cm = cf.get_compaction_manager(); - auto& cdata = compaction_manager_test(cm).register_compaction(descriptor.run_identifier, &cf); - return sstables::compact_sstables(std::move(descriptor), cdata, cf.as_table_state()).then([&cdata, &cm] (sstables::compaction_result res) { - return res; - }).finally([&cm, &cdata] { - compaction_manager_test(cm).deregister_compaction(cdata); + auto cmt = compaction_manager_test(cf.get_compaction_manager()); + sstables::compaction_result ret; + co_await cmt.run(descriptor.run_identifier, &cf, [&] (sstables::compaction_data& cdata) { + return sstables::compact_sstables(std::move(descriptor), cdata, cf.as_table_state()).then([&] (sstables::compaction_result res) { + ret = std::move(res); + }); }); + co_return ret; } std::vector> token_generation_for_current_shard(unsigned tokens_to_generate) { @@ -193,17 +194,28 @@ class compaction_manager::compaction_manager_test_task : public compaction_manag noncopyable_function (sstables::compaction_data&)> _job; public: - compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, utils::UUID run_id) + compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, utils::UUID run_id, noncopyable_function (sstables::compaction_data&)> job) : compaction_manager::task(cm, cf, sstables::compaction_type::Compaction, "Test compaction") , _run_id(run_id) - { - // FIXME: for now + , _job(std::move(job)) + { } + +protected: + virtual future<> do_run() override { setup_new_compaction(_run_id); + return _job(_compaction_data); } }; -sstables::compaction_data& compaction_manager_test::register_compaction(utils::UUID output_run_id, replica::column_family* cf) { - auto task = make_shared(_cm, cf, output_run_id); +future<> compaction_manager_test::run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function (sstables::compaction_data&)> job) { + auto task = make_shared(_cm, cf, output_run_id, std::move(job)); + auto& cdata = register_compaction(task); + return task->run().finally([this, &cdata] { + deregister_compaction(cdata); + }); +} + +sstables::compaction_data& compaction_manager_test::register_compaction(shared_ptr task) { testlog.debug("compaction_manager_test: register_compaction uuid={}: {}", task->compaction_data().compaction_uuid, *task); _cm._tasks.push_back(task); return task->compaction_data(); @@ -216,6 +228,6 @@ void compaction_manager_test::deregister_compaction(const sstables::compaction_d testlog.debug("compaction_manager_test: deregister_compaction uuid={}: {}", c.compaction_uuid, *task); _cm._tasks.erase(it); } else { - testlog.debug("compaction_manager_test: deregister_compaction uuid={}: task not found", c.compaction_uuid); + testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", c.compaction_uuid); } } diff --git a/test/lib/sstable_utils.hh b/test/lib/sstable_utils.hh index 3c0ac76d4a..a8f4e1e419 100644 --- a/test/lib/sstable_utils.hh +++ b/test/lib/sstable_utils.hh @@ -345,7 +345,9 @@ class compaction_manager_test { public: explicit compaction_manager_test(compaction_manager& cm) noexcept : _cm(cm) {} - sstables::compaction_data& register_compaction(utils::UUID output_run_id, replica::column_family* cf); + future<> run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function (sstables::compaction_data&)> job); +private: + sstables::compaction_data& register_compaction(shared_ptr task); void deregister_compaction(const sstables::compaction_data& c); };