diff --git a/compaction/compaction.cc b/compaction/compaction.cc index d92d765bc9..b9a90915e0 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -858,7 +858,7 @@ public: void compacting_sstable_writer::maybe_abort_compaction() { if (_c._info->is_stop_requested()) [[unlikely]] { // Compaction manager will catch this exception and re-schedule the compaction. - throw compaction_stop_exception(_c._info->ks_name, _c._info->cf_name, _c._info->stop_requested); + throw compaction_stopped_exception(_c._info->ks_name, _c._info->cf_name, _c._info->stop_requested); } } @@ -1281,7 +1281,7 @@ private: private: void maybe_abort_scrub() { if (_scrub_mode == compaction_type_options::scrub::mode::abort) { - throw compaction_stop_exception(_schema->ks_name(), _schema->cf_name(), "scrub compaction found invalid data"); + throw compaction_aborted_exception(_schema->ks_name(), _schema->cf_name(), "scrub compaction found invalid data"); } } @@ -1292,7 +1292,7 @@ private: auto pe = mutation_fragment(*_schema, _permit, partition_end{}); if (!_validator(pe)) { - throw compaction_stop_exception( + throw compaction_aborted_exception( _schema->ks_name(), _schema->cf_name(), "scrub compaction failed to rectify unexpected partition-start, validator rejects the injected partition-end"); @@ -1300,7 +1300,7 @@ private: push_mutation_fragment(std::move(pe)); if (!_validator(ps)) { - throw compaction_stop_exception( + throw compaction_aborted_exception( _schema->ks_name(), _schema->cf_name(), "scrub compaction failed to rectify unexpected partition-start, validator rejects it even after the injected partition-end"); @@ -1413,7 +1413,7 @@ private: }).handle_exception([this] (std::exception_ptr e) { try { std::rethrow_exception(std::move(e)); - } catch (const compaction_stop_exception&) { + } catch (const compaction_job_exception&) { // Propagate these unchanged. throw; } catch (const storage_io_error&) { @@ -1421,7 +1421,7 @@ private: throw; } catch (...) { // We don't want failed scrubs to be retried. - throw compaction_stop_exception( + throw compaction_aborted_exception( _schema->ks_name(), _schema->cf_name(), format("scrub compaction failed due to unrecoverable error: {}", std::current_exception())); @@ -1657,7 +1657,7 @@ future scrub_validate_mode_validate_reader(flat_mutation_reader reader, co while (auto mf_opt = co_await reader()) { if (info.is_stop_requested()) [[unlikely]] { // Compaction manager will catch this exception and re-schedule the compaction. - co_return coroutine::make_exception(compaction_stop_exception(info.ks_name, info.cf_name, info.stop_requested)); + co_return coroutine::make_exception(compaction_stopped_exception(info.ks_name, info.cf_name, info.stop_requested)); } const auto& mf = *mf_opt; diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index cc2bf52f18..24b84209fa 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -274,9 +274,8 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) { try { f.get(); _stats.completed_tasks++; - } catch (sstables::compaction_stop_exception& e) { + } catch (sstables::compaction_stopped_exception& e) { cmlog.info("major compaction stopped, reason: {}", e.what()); - _stats.errors++; } catch (...) { cmlog.error("major compaction failed, reason: {}", std::current_exception()); _stats.errors++; @@ -315,7 +314,7 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstables::compact _tasks.remove(task); try { f.get(); - } catch (sstables::compaction_stop_exception& e) { + } catch (sstables::compaction_stopped_exception& e) { cmlog.info("{} was abruptly stopped, reason: {}", task->type, e.what()); throw; } catch (...) { @@ -464,7 +463,7 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason) { return this->task_stop(task).then_wrapped([](future <> f) { try { f.get(); - } catch (sstables::compaction_stop_exception& e) { + } catch (sstables::compaction_stopped_exception& e) { // swallow stop exception if a given procedure decides to propagate it to the caller, // as it happens with reshard and reshape. } catch (...) { @@ -537,8 +536,11 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w try { f.get(); - } catch (sstables::compaction_stop_exception& e) { + } catch (sstables::compaction_stopped_exception& e) { cmlog.info("compaction info: {}: stopping", e.what()); + } catch (sstables::compaction_aborted_exception& e) { + cmlog.error("compaction info: {}: stopping", e.what()); + _stats.errors++; } catch (storage_io_error& e) { _stats.errors++; cmlog.error("compaction failed due to storage io error: {}: stopping", e.what()); @@ -649,8 +651,11 @@ void compaction_manager::submit_offstrategy(column_family* cf) { try { f.get(); _stats.completed_tasks++; - } catch (sstables::compaction_stop_exception& e) { - cmlog.info("off-strategy compaction was abruptly stopped, reason: {}", e.what()); + } catch (sstables::compaction_stopped_exception& e) { + cmlog.info("off-strategy compaction: {}", e.what()); + } catch (sstables::compaction_aborted_exception& e) { + _stats.errors++; + cmlog.error("off-strategy compaction: {}", e.what()); } catch (...) { _stats.errors++; _stats.pending_tasks++; @@ -789,7 +794,7 @@ future<> compaction_manager::perform_sstable_scrub_validate_mode(column_family* sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate)); return compact_sstables(std::move(desc), cf); }); - } catch (sstables::compaction_stop_exception&) { + } catch (sstables::compaction_stopped_exception&) { throw; // let run_custom_job() handle this } catch (storage_io_error&) { throw; // let run_custom_job() handle this diff --git a/sstables/exceptions.hh b/sstables/exceptions.hh index fa1434d6be..0261f455c1 100644 --- a/sstables/exceptions.hh +++ b/sstables/exceptions.hh @@ -45,14 +45,29 @@ struct bufsize_mismatch_exception : malformed_sstable_exception { {} }; -class compaction_stop_exception : public std::exception { +class compaction_job_exception : public std::exception { sstring _msg; public: - compaction_stop_exception(sstring ks, sstring cf, sstring reason) : - _msg(format("Compaction for {}/{} was stopped due to: {}", ks, cf, reason)) {} + compaction_job_exception(sstring msg) noexcept : _msg(std::move(msg)) {} const char *what() const noexcept { return _msg.c_str(); } }; +// Indicates that compaction was stopped via an external event, +// E.g. shutdown or api call. +class compaction_stopped_exception : public compaction_job_exception { +public: + compaction_stopped_exception(sstring ks, sstring cf, sstring reason) + : compaction_job_exception(format("Compaction for {}/{} was stopped due to: {}", ks, cf, reason)) {} +}; + +// Indicates that compaction hit an unrecoverable error +// and should be aborted. +class compaction_aborted_exception : public compaction_job_exception { +public: + compaction_aborted_exception(sstring ks, sstring cf, sstring reason) + : compaction_job_exception(format("Compaction for {}/{} was aborted due to: {}", ks, cf, reason)) {} +}; + } diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 37504c8749..610eead4fb 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -365,7 +365,7 @@ future sstable_directory::reshape(compaction_manager& cm, table& table }).then_wrapped([&table] (future<> f) { try { f.get(); - } catch (sstables::compaction_stop_exception& e) { + } catch (sstables::compaction_stopped_exception& e) { dirlog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); return make_ready_future(stop_iteration::yes); }