compaction: split compaction_aborted_exception from compaction_stopped_exception

Indicate whether the compaction job should be aborted
due to an error using a new, compaction_aborted_exception type,
vs. compaction_stopped_exception that indicates
the task should be stopped due to some external event that
doesn't indicate an error (like shutdown or api call).

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-09-12 14:36:07 +03:00
parent eebe14e7bc
commit fa46bf3499
4 changed files with 39 additions and 19 deletions

View File

@@ -858,7 +858,7 @@ public:
void compacting_sstable_writer::maybe_abort_compaction() {
if (_c._info->is_stop_requested()) [[unlikely]] {
// Compaction manager will catch this exception and re-schedule the compaction.
throw compaction_stop_exception(_c._info->ks_name, _c._info->cf_name, _c._info->stop_requested);
throw compaction_stopped_exception(_c._info->ks_name, _c._info->cf_name, _c._info->stop_requested);
}
}
@@ -1281,7 +1281,7 @@ private:
private:
void maybe_abort_scrub() {
if (_scrub_mode == compaction_type_options::scrub::mode::abort) {
throw compaction_stop_exception(_schema->ks_name(), _schema->cf_name(), "scrub compaction found invalid data");
throw compaction_aborted_exception(_schema->ks_name(), _schema->cf_name(), "scrub compaction found invalid data");
}
}
@@ -1292,7 +1292,7 @@ private:
auto pe = mutation_fragment(*_schema, _permit, partition_end{});
if (!_validator(pe)) {
throw compaction_stop_exception(
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
"scrub compaction failed to rectify unexpected partition-start, validator rejects the injected partition-end");
@@ -1300,7 +1300,7 @@ private:
push_mutation_fragment(std::move(pe));
if (!_validator(ps)) {
throw compaction_stop_exception(
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
"scrub compaction failed to rectify unexpected partition-start, validator rejects it even after the injected partition-end");
@@ -1413,7 +1413,7 @@ private:
}).handle_exception([this] (std::exception_ptr e) {
try {
std::rethrow_exception(std::move(e));
} catch (const compaction_stop_exception&) {
} catch (const compaction_job_exception&) {
// Propagate these unchanged.
throw;
} catch (const storage_io_error&) {
@@ -1421,7 +1421,7 @@ private:
throw;
} catch (...) {
// We don't want failed scrubs to be retried.
throw compaction_stop_exception(
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
format("scrub compaction failed due to unrecoverable error: {}", std::current_exception()));
@@ -1657,7 +1657,7 @@ future<bool> scrub_validate_mode_validate_reader(flat_mutation_reader reader, co
while (auto mf_opt = co_await reader()) {
if (info.is_stop_requested()) [[unlikely]] {
// Compaction manager will catch this exception and re-schedule the compaction.
co_return coroutine::make_exception(compaction_stop_exception(info.ks_name, info.cf_name, info.stop_requested));
co_return coroutine::make_exception(compaction_stopped_exception(info.ks_name, info.cf_name, info.stop_requested));
}
const auto& mf = *mf_opt;

View File

@@ -274,9 +274,8 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
try {
f.get();
_stats.completed_tasks++;
} catch (sstables::compaction_stop_exception& e) {
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("major compaction stopped, reason: {}", e.what());
_stats.errors++;
} catch (...) {
cmlog.error("major compaction failed, reason: {}", std::current_exception());
_stats.errors++;
@@ -315,7 +314,7 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstables::compact
_tasks.remove(task);
try {
f.get();
} catch (sstables::compaction_stop_exception& e) {
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("{} was abruptly stopped, reason: {}", task->type, e.what());
throw;
} catch (...) {
@@ -464,7 +463,7 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
return this->task_stop(task).then_wrapped([](future <> f) {
try {
f.get();
} catch (sstables::compaction_stop_exception& e) {
} catch (sstables::compaction_stopped_exception& e) {
// swallow stop exception if a given procedure decides to propagate it to the caller,
// as it happens with reshard and reshape.
} catch (...) {
@@ -537,8 +536,11 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w
try {
f.get();
} catch (sstables::compaction_stop_exception& e) {
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("compaction info: {}: stopping", e.what());
} catch (sstables::compaction_aborted_exception& e) {
cmlog.error("compaction info: {}: stopping", e.what());
_stats.errors++;
} catch (storage_io_error& e) {
_stats.errors++;
cmlog.error("compaction failed due to storage io error: {}: stopping", e.what());
@@ -649,8 +651,11 @@ void compaction_manager::submit_offstrategy(column_family* cf) {
try {
f.get();
_stats.completed_tasks++;
} catch (sstables::compaction_stop_exception& e) {
cmlog.info("off-strategy compaction was abruptly stopped, reason: {}", e.what());
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("off-strategy compaction: {}", e.what());
} catch (sstables::compaction_aborted_exception& e) {
_stats.errors++;
cmlog.error("off-strategy compaction: {}", e.what());
} catch (...) {
_stats.errors++;
_stats.pending_tasks++;
@@ -789,7 +794,7 @@ future<> compaction_manager::perform_sstable_scrub_validate_mode(column_family*
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
return compact_sstables(std::move(desc), cf);
});
} catch (sstables::compaction_stop_exception&) {
} catch (sstables::compaction_stopped_exception&) {
throw; // let run_custom_job() handle this
} catch (storage_io_error&) {
throw; // let run_custom_job() handle this

View File

@@ -45,14 +45,29 @@ struct bufsize_mismatch_exception : malformed_sstable_exception {
{}
};
class compaction_stop_exception : public std::exception {
class compaction_job_exception : public std::exception {
sstring _msg;
public:
compaction_stop_exception(sstring ks, sstring cf, sstring reason) :
_msg(format("Compaction for {}/{} was stopped due to: {}", ks, cf, reason)) {}
compaction_job_exception(sstring msg) noexcept : _msg(std::move(msg)) {}
const char *what() const noexcept {
return _msg.c_str();
}
};
// Indicates that compaction was stopped via an external event,
// E.g. shutdown or api call.
class compaction_stopped_exception : public compaction_job_exception {
public:
compaction_stopped_exception(sstring ks, sstring cf, sstring reason)
: compaction_job_exception(format("Compaction for {}/{} was stopped due to: {}", ks, cf, reason)) {}
};
// Indicates that compaction hit an unrecoverable error
// and should be aborted.
class compaction_aborted_exception : public compaction_job_exception {
public:
compaction_aborted_exception(sstring ks, sstring cf, sstring reason)
: compaction_job_exception(format("Compaction for {}/{} was aborted due to: {}", ks, cf, reason)) {}
};
}

View File

@@ -365,7 +365,7 @@ future<uint64_t> sstable_directory::reshape(compaction_manager& cm, table& table
}).then_wrapped([&table] (future<> f) {
try {
f.get();
} catch (sstables::compaction_stop_exception& e) {
} catch (sstables::compaction_stopped_exception& e) {
dirlog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name());
return make_ready_future<stop_iteration>(stop_iteration::yes);
}