scrub: add support for dropping unfixable sstables in segregate mode

This patch adds a new flag `drop-unfixable-sstables` to the scrub operation
in segregate mode, allowing to automatically drop SSTables that
cannot be fixed during scrub. It also includes API support of the 'drop_unfixable_sstables'
paramater and validation to ensure this flag is not enabled in other modes rather than segragate.
This commit is contained in:
Taras Veretilnyk
2025-10-15 13:04:32 +02:00
parent 2c74a6981b
commit 42da7f1eb6
8 changed files with 131 additions and 18 deletions

View File

@@ -1144,6 +1144,14 @@
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name": "drop_unfixable_sstables",
"description": "When set to true, drop unfixable sstables. Applies only to scrub mode SEGREGATE.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}

View File

@@ -273,6 +273,13 @@ scrub_info parse_scrub_options(const http_context& ctx, std::unique_ptr<http::re
throw httpd::bad_param_exception(fmt::format("Unknown argument for 'quarantine_mode' parameter: {}", quarantine_mode_str));
}
if(req_param<bool>(*req, "drop_unfixable_sstables", false)) {
if(scrub_mode != compaction::compaction_type_options::scrub::mode::segregate) {
throw httpd::bad_param_exception("The 'drop_unfixable_sstables' parameter is only valid when 'scrub_mode' is 'SEGREGATE'");
}
info.opts.drop_unfixable = compaction::compaction_type_options::scrub::drop_unfixable_sstables::yes;
}
return info;
}

View File

@@ -1537,6 +1537,8 @@ private:
mutation_fragment_stream_validator _validator;
bool _skip_to_next_partition = false;
uint64_t& _validation_errors;
bool& _failed_to_fix_sstable;
compaction_type_options::scrub::drop_unfixable_sstables _drop_unfixable_sstables;
private:
void maybe_abort_scrub(std::function<void()> report_error) {
@@ -1547,7 +1549,7 @@ private:
++_validation_errors;
}
void on_unexpected_partition_start(const mutation_fragment_v2& ps, sstring error) {
skip on_unexpected_partition_start(const mutation_fragment_v2& ps, sstring error) {
auto report_fn = [this, error] (std::string_view action = "") {
report_validation_error(compaction_type::Scrub, *_schema, error, action);
};
@@ -1556,6 +1558,11 @@ private:
auto pe = mutation_fragment_v2(*_schema, _permit, partition_end{});
if (!_validator(pe)) {
if (_drop_unfixable_sstables) {
_failed_to_fix_sstable = true;
end_stream();
return skip::yes;
}
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
@@ -1564,11 +1571,17 @@ private:
push_mutation_fragment(std::move(pe));
if (!_validator(ps)) {
if (_drop_unfixable_sstables) {
_failed_to_fix_sstable = true;
end_stream();
return skip::yes;
}
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
"scrub compaction failed to rectify unexpected partition-start, validator rejects it even after the injected partition-end");
}
return skip::no;
}
skip on_invalid_partition(const dht::decorated_key& new_key, sstring error) {
@@ -1596,6 +1609,11 @@ private:
const auto& key = _validator.previous_partition_key();
if (_validator.current_tombstone()) {
if (_drop_unfixable_sstables) {
_failed_to_fix_sstable = true;
end_stream();
return skip::yes;
}
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
@@ -1635,13 +1653,21 @@ private:
}
void on_malformed_sstable_exception(std::exception_ptr e) {
if (_scrub_mode != compaction_type_options::scrub::mode::skip) {
bool should_abort = _scrub_mode == compaction_type_options::scrub::mode::abort ||
(_scrub_mode == compaction_type_options::scrub::mode::segregate && !_drop_unfixable_sstables);
if (should_abort) {
throw compaction_aborted_exception(
_schema->ks_name(),
_schema->cf_name(),
format("scrub compaction failed due to unrecoverable error: {}", e));
}
if (_drop_unfixable_sstables) {
_failed_to_fix_sstable = true;
}
end_stream();
}
void end_stream() {
// Closes the active range tombstone if needed, before emitting partition end.
if (auto current_tombstone = _validator.current_tombstone(); current_tombstone) {
const auto& last_pos = _validator.previous_position();
@@ -1662,6 +1688,10 @@ private:
void fill_buffer_from_underlying() {
utils::get_local_injector().inject("rest_api_keyspace_scrub_abort", [] { throw compaction_aborted_exception("", "", "scrub compaction found invalid data"); });
while (!_reader.is_buffer_empty() && !is_buffer_full()) {
if (_end_of_stream && _failed_to_fix_sstable) {
return;
}
auto mf = _reader.pop_mutation_fragment();
if (mf.is_partition_start()) {
// First check that fragment kind monotonicity stands.
@@ -1672,7 +1702,9 @@ private:
// will confuse it.
if (!_skip_to_next_partition) {
if (auto res = _validator(mf); !res) {
on_unexpected_partition_start(mf, res.what());
if (on_unexpected_partition_start(mf, res.what()) == skip::yes) {
continue;
}
}
// Continue processing this partition start.
}
@@ -1696,6 +1728,10 @@ private:
push_mutation_fragment(std::move(mf));
}
if (_end_of_stream && _failed_to_fix_sstable) {
return;
}
_end_of_stream = _reader.is_end_of_stream() && _reader.is_buffer_empty();
if (_end_of_stream) {
@@ -1706,12 +1742,15 @@ private:
}
public:
reader(mutation_reader underlying, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors)
reader(mutation_reader underlying, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors,
bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables)
: impl(underlying.schema(), underlying.permit())
, _scrub_mode(scrub_mode)
, _reader(std::move(underlying))
, _validator(*_schema)
, _validation_errors(validation_errors)
, _failed_to_fix_sstable(failed_to_fix_sstable)
, _drop_unfixable_sstables(drop_unfixable_sstables)
{ }
virtual future<> fill_buffer() override {
if (_end_of_stream) {
@@ -1762,6 +1801,7 @@ private:
mutable std::string _scrub_finish_description;
uint64_t _bucket_count = 0;
uint64_t _validation_errors = 0;
bool _failed_to_fix_sstable = false;
public:
scrub_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::scrub options, compaction_progress_monitor& progress_monitor)
@@ -1793,7 +1833,7 @@ public:
on_internal_error(clogger, fmt::format("Scrub compaction in mode {} expected full partition range, but got {} instead", _options.operation_mode, range));
}
auto full_scan_reader = _compacting->make_full_scan_reader(std::move(s), std::move(permit), nullptr, unwrap_monitor_generator(), sstables::integrity_check::yes);
return make_mutation_reader<reader>(std::move(full_scan_reader), _options.operation_mode, _validation_errors);
return make_mutation_reader<reader>(std::move(full_scan_reader), _options.operation_mode, _validation_errors, _failed_to_fix_sstable, _options.drop_unfixable);
}
uint64_t partitions_per_sstable() const override {
@@ -1830,11 +1870,45 @@ public:
return ret;
}
friend mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors);
void drop_unfixable_sstables() {
if (!_sstables.empty() || !used_garbage_collected_sstables().empty()) {
std::vector<sstables::shared_sstable> old_sstables;
std::move(_sstables.begin(), _sstables.end(), std::back_inserter(old_sstables));
// Remove Garbage Collected SSTables from the SSTable set if any was previously added.
auto& used_gc_sstables = used_garbage_collected_sstables();
old_sstables.insert(old_sstables.end(), used_gc_sstables.begin(), used_gc_sstables.end());
_replacer(get_compaction_completion_desc(std::move(old_sstables), {}));
}
// Mark new sstables for deletion as well
for (auto& sst : boost::range::join(_new_partial_sstables, _new_unused_sstables)) {
sst->mark_for_deletion();
}
}
virtual void on_end_of_compaction() override {
if (_options.drop_unfixable && _failed_to_fix_sstable) {
drop_unfixable_sstables();
} else {
regular_compaction::on_end_of_compaction();
}
}
virtual void stop_sstable_writer(compaction_writer* writer) override {
if (_options.drop_unfixable && _failed_to_fix_sstable && writer) {
finish_new_sstable(writer);
} else {
regular_compaction::stop_sstable_writer(writer);
}
}
friend mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables);
};
mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors) {
return make_mutation_reader<scrub_compaction::reader>(std::move(rd), scrub_mode, validation_errors);
mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables) {
return make_mutation_reader<scrub_compaction::reader>(std::move(rd), scrub_mode, validation_errors, failed_to_fix_sstable, drop_unfixable_sstables);
}
class resharding_compaction final : public compaction {

View File

@@ -138,6 +138,6 @@ std::unordered_set<sstables::shared_sstable>
get_fully_expired_sstables(const compaction_group_view& table_s, const std::vector<sstables::shared_sstable>& compacting, gc_clock::time_point gc_before);
// For tests, can drop after we virtualize sstables.
mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors);
mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables);
}

View File

@@ -74,6 +74,11 @@ public:
// Should invalid sstables be moved into quarantine.
// Only applies to validate-mode.
quarantine_invalid_sstables quarantine_sstables = quarantine_invalid_sstables::yes;
using drop_unfixable_sstables = bool_class<class drop_unfixable_sstables_tag>;
// Drop sstables that cannot be fixed.
// Only applies to segregate-mode.
drop_unfixable_sstables drop_unfixable = drop_unfixable_sstables::no;
};
struct reshard {
};
@@ -113,8 +118,8 @@ public:
return compaction_type_options(upgrade{});
}
static compaction_type_options make_scrub(scrub::mode mode, scrub::quarantine_invalid_sstables quarantine_sstables = scrub::quarantine_invalid_sstables::yes) {
return compaction_type_options(scrub{.operation_mode = mode, .quarantine_sstables = quarantine_sstables});
static compaction_type_options make_scrub(scrub::mode mode, scrub::quarantine_invalid_sstables quarantine_sstables = scrub::quarantine_invalid_sstables::yes, scrub::drop_unfixable_sstables drop_unfixable_sstables = scrub::drop_unfixable_sstables::no) {
return compaction_type_options(scrub{.operation_mode = mode, .quarantine_sstables = quarantine_sstables, .drop_unfixable = drop_unfixable_sstables});
}
static compaction_type_options make_split(mutation_writer::classify_by_token_group classifier) {

View File

@@ -2313,7 +2313,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
}
owned_ranges_ptr owned_ranges_ptr = {};
sstring option_desc = fmt::format("mode: {};\nquarantine_mode: {}\n", opts.operation_mode, opts.quarantine_operation_mode);
co_return co_await rewrite_sstables(t, compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] -> future<std::vector<sstables::shared_sstable>> {
co_return co_await rewrite_sstables(t, compaction_type_options::make_scrub(scrub_mode, opts.quarantine_sstables, opts.drop_unfixable), std::move(owned_ranges_ptr), [&t, opts] -> future<std::vector<sstables::shared_sstable>> {
auto all_sstables = co_await get_all_sstables(t);
std::vector<sstables::shared_sstable> sstables = all_sstables
| std::views::filter([&opts] (const sstables::shared_sstable& sst) {

View File

@@ -3281,8 +3281,10 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
std::list<std::deque<mutation_fragment_v2>> segregated_fragment_streams;
uint64_t validation_errors = 0;
bool failed_to_fix_sstable = false;
mutation_writer::segregate_by_partition(
make_scrubbing_reader(make_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)), compaction::compaction_type_options::scrub::mode::segregate, validation_errors),
make_scrubbing_reader(make_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)), compaction::compaction_type_options::scrub::mode::segregate,
validation_errors, failed_to_fix_sstable, compaction::compaction_type_options::scrub::drop_unfixable_sstables::no),
mutation_writer::segregate_config{100000},
[&schema, &segregated_fragment_streams] (mutation_reader rd) {
return async([&schema, &segregated_fragment_streams, rd = std::move(rd)] () mutable {
@@ -3422,8 +3424,9 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end - at EOS
uint64_t validation_errors = 0;
bool failed_to_fix_sstable = false;
auto r = assert_that(make_scrubbing_reader(make_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)),
compaction::compaction_type_options::scrub::mode::skip, validation_errors));
compaction::compaction_type_options::scrub::mode::skip, validation_errors, failed_to_fix_sstable, compaction::compaction_type_options::scrub::drop_unfixable_sstables::no));
for (const auto& mf : scrubbed_fragments) {
testlog.info("Expecting {}", mutation_fragment_v2::printer(*schema, mf));
r.produces(*schema, mf);

View File

@@ -2158,19 +2158,30 @@ void scrub_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
} else {
keyspaces.push_back(std::move(keyspace));
}
if (vm.contains("skip-corrupted") && vm.contains("mode")) {
const bool has_skip_corrupted = vm.contains("skip-corrupted");
const bool has_mode = vm.contains("mode");
const bool has_drop_unfixable = vm.contains("drop-unfixable-sstables");
const sstring mode = has_mode ? vm["mode"].as<sstring>() : "";
if (has_skip_corrupted && has_mode) {
throw std::invalid_argument("cannot use --skip-corrupted when --mode is used");
}
if (has_drop_unfixable && (!has_mode || mode != "SEGREGATE")) {
throw std::invalid_argument("--drop-unfixable-sstables is only valid with --mode=SEGREGATE");
}
std::unordered_map<sstring, sstring> params;
if (!tables.empty()) {
params["cf"] = fmt::to_string(fmt::join(tables.begin(), tables.end(), ","));
}
if (vm.contains("mode")) {
params["scrub_mode"] = vm["mode"].as<sstring>();
} else if (vm.contains("skip-corrupted")) {
if (has_mode) {
params["scrub_mode"] = mode;
} else if (has_skip_corrupted) {
params["scrub_mode"] = "SKIP";
}
@@ -2182,6 +2193,10 @@ void scrub_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
params["disable_snapshot"] = "true";
}
if (has_drop_unfixable) {
params["drop_unfixable_sstables"] = "true";
}
std::vector<api::scrub_status> statuses;
for (const auto& keyspace : keyspaces) {
statuses.push_back(api::scrub_status(client.get(format("/storage_service/keyspace_scrub/{}", keyspace), params).GetInt64()));
@@ -4401,6 +4416,7 @@ For more information, see: {}
typed_option<>("no-validate,n", "Do not validate columns using column validator (unused)"),
typed_option<>("reinsert-overflowed-ttl,r", "Rewrites rows with overflowed expiration date (unused)"),
typed_option<int64_t>("jobs,j", "The number of sstables to be scrubbed concurrently (unused)"),
typed_option<>("drop-unfixable-sstables", "Drop unfixed sstables instead of aborting the entire scrub (only with --mode=SEGREGATE)"),
},
{
typed_option<sstring>("keyspace", "The keyspace to scrub", 1),