From 42da7f1eb6c19bbe9ea15c2e473fc5b0df30d692 Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 15 Oct 2025 13:04:32 +0200 Subject: [PATCH] scrub: add support for dropping unfixable sstables in segregate mode This patch adds a new flag `drop-unfixable-sstables` to the scrub operation in segregate mode, allowing to automatically drop SSTables that cannot be fixed during scrub. It also includes API support of the 'drop_unfixable_sstables' paramater and validation to ensure this flag is not enabled in other modes rather than segragate. --- api/api-doc/storage_service.json | 8 +++ api/storage_service.cc | 7 +++ compaction/compaction.cc | 90 ++++++++++++++++++++++++--- compaction/compaction.hh | 2 +- compaction/compaction_descriptor.hh | 9 ++- compaction/compaction_manager.cc | 2 +- test/boost/sstable_compaction_test.cc | 7 ++- tools/scylla-nodetool.cc | 24 +++++-- 8 files changed, 131 insertions(+), 18 deletions(-) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 0543cd36f2..1ad32fd8f3 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -1144,6 +1144,14 @@ "allowMultiple":false, "type":"string", "paramType":"query" + }, + { + "name": "drop_unfixable_sstables", + "description": "When set to true, drop unfixable sstables. Applies only to scrub mode SEGREGATE.", + "required":false, + "allowMultiple":false, + "type":"boolean", + "paramType":"query" } ] } diff --git a/api/storage_service.cc b/api/storage_service.cc index 5f6ae3154b..9879421573 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -273,6 +273,13 @@ scrub_info parse_scrub_options(const http_context& ctx, std::unique_ptr(*req, "drop_unfixable_sstables", false)) { + if(scrub_mode != compaction::compaction_type_options::scrub::mode::segregate) { + throw httpd::bad_param_exception("The 'drop_unfixable_sstables' parameter is only valid when 'scrub_mode' is 'SEGREGATE'"); + } + info.opts.drop_unfixable = compaction::compaction_type_options::scrub::drop_unfixable_sstables::yes; + } + return info; } diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 6291dbcde2..55817279d7 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -1537,6 +1537,8 @@ private: mutation_fragment_stream_validator _validator; bool _skip_to_next_partition = false; uint64_t& _validation_errors; + bool& _failed_to_fix_sstable; + compaction_type_options::scrub::drop_unfixable_sstables _drop_unfixable_sstables; private: void maybe_abort_scrub(std::function report_error) { @@ -1547,7 +1549,7 @@ private: ++_validation_errors; } - void on_unexpected_partition_start(const mutation_fragment_v2& ps, sstring error) { + skip on_unexpected_partition_start(const mutation_fragment_v2& ps, sstring error) { auto report_fn = [this, error] (std::string_view action = "") { report_validation_error(compaction_type::Scrub, *_schema, error, action); }; @@ -1556,6 +1558,11 @@ private: auto pe = mutation_fragment_v2(*_schema, _permit, partition_end{}); if (!_validator(pe)) { + if (_drop_unfixable_sstables) { + _failed_to_fix_sstable = true; + end_stream(); + return skip::yes; + } throw compaction_aborted_exception( _schema->ks_name(), _schema->cf_name(), @@ -1564,11 +1571,17 @@ private: push_mutation_fragment(std::move(pe)); if (!_validator(ps)) { + if (_drop_unfixable_sstables) { + _failed_to_fix_sstable = true; + end_stream(); + return skip::yes; + } throw compaction_aborted_exception( _schema->ks_name(), _schema->cf_name(), "scrub compaction failed to rectify unexpected partition-start, validator rejects it even after the injected partition-end"); } + return skip::no; } skip on_invalid_partition(const dht::decorated_key& new_key, sstring error) { @@ -1596,6 +1609,11 @@ private: const auto& key = _validator.previous_partition_key(); if (_validator.current_tombstone()) { + if (_drop_unfixable_sstables) { + _failed_to_fix_sstable = true; + end_stream(); + return skip::yes; + } throw compaction_aborted_exception( _schema->ks_name(), _schema->cf_name(), @@ -1635,13 +1653,21 @@ private: } void on_malformed_sstable_exception(std::exception_ptr e) { - if (_scrub_mode != compaction_type_options::scrub::mode::skip) { + bool should_abort = _scrub_mode == compaction_type_options::scrub::mode::abort || + (_scrub_mode == compaction_type_options::scrub::mode::segregate && !_drop_unfixable_sstables); + if (should_abort) { throw compaction_aborted_exception( _schema->ks_name(), _schema->cf_name(), format("scrub compaction failed due to unrecoverable error: {}", e)); } + if (_drop_unfixable_sstables) { + _failed_to_fix_sstable = true; + } + end_stream(); + } + void end_stream() { // Closes the active range tombstone if needed, before emitting partition end. if (auto current_tombstone = _validator.current_tombstone(); current_tombstone) { const auto& last_pos = _validator.previous_position(); @@ -1662,6 +1688,10 @@ private: void fill_buffer_from_underlying() { utils::get_local_injector().inject("rest_api_keyspace_scrub_abort", [] { throw compaction_aborted_exception("", "", "scrub compaction found invalid data"); }); while (!_reader.is_buffer_empty() && !is_buffer_full()) { + if (_end_of_stream && _failed_to_fix_sstable) { + return; + } + auto mf = _reader.pop_mutation_fragment(); if (mf.is_partition_start()) { // First check that fragment kind monotonicity stands. @@ -1672,7 +1702,9 @@ private: // will confuse it. if (!_skip_to_next_partition) { if (auto res = _validator(mf); !res) { - on_unexpected_partition_start(mf, res.what()); + if (on_unexpected_partition_start(mf, res.what()) == skip::yes) { + continue; + } } // Continue processing this partition start. } @@ -1696,6 +1728,10 @@ private: push_mutation_fragment(std::move(mf)); } + if (_end_of_stream && _failed_to_fix_sstable) { + return; + } + _end_of_stream = _reader.is_end_of_stream() && _reader.is_buffer_empty(); if (_end_of_stream) { @@ -1706,12 +1742,15 @@ private: } public: - reader(mutation_reader underlying, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors) + reader(mutation_reader underlying, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, + bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables) : impl(underlying.schema(), underlying.permit()) , _scrub_mode(scrub_mode) , _reader(std::move(underlying)) , _validator(*_schema) , _validation_errors(validation_errors) + , _failed_to_fix_sstable(failed_to_fix_sstable) + , _drop_unfixable_sstables(drop_unfixable_sstables) { } virtual future<> fill_buffer() override { if (_end_of_stream) { @@ -1762,6 +1801,7 @@ private: mutable std::string _scrub_finish_description; uint64_t _bucket_count = 0; uint64_t _validation_errors = 0; + bool _failed_to_fix_sstable = false; public: scrub_compaction(compaction_group_view& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::scrub options, compaction_progress_monitor& progress_monitor) @@ -1793,7 +1833,7 @@ public: on_internal_error(clogger, fmt::format("Scrub compaction in mode {} expected full partition range, but got {} instead", _options.operation_mode, range)); } auto full_scan_reader = _compacting->make_full_scan_reader(std::move(s), std::move(permit), nullptr, unwrap_monitor_generator(), sstables::integrity_check::yes); - return make_mutation_reader(std::move(full_scan_reader), _options.operation_mode, _validation_errors); + return make_mutation_reader(std::move(full_scan_reader), _options.operation_mode, _validation_errors, _failed_to_fix_sstable, _options.drop_unfixable); } uint64_t partitions_per_sstable() const override { @@ -1830,11 +1870,45 @@ public: return ret; } - friend mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors); + void drop_unfixable_sstables() { + if (!_sstables.empty() || !used_garbage_collected_sstables().empty()) { + std::vector old_sstables; + std::move(_sstables.begin(), _sstables.end(), std::back_inserter(old_sstables)); + + // Remove Garbage Collected SSTables from the SSTable set if any was previously added. + auto& used_gc_sstables = used_garbage_collected_sstables(); + old_sstables.insert(old_sstables.end(), used_gc_sstables.begin(), used_gc_sstables.end()); + + _replacer(get_compaction_completion_desc(std::move(old_sstables), {})); + } + + // Mark new sstables for deletion as well + for (auto& sst : boost::range::join(_new_partial_sstables, _new_unused_sstables)) { + sst->mark_for_deletion(); + } + } + + virtual void on_end_of_compaction() override { + if (_options.drop_unfixable && _failed_to_fix_sstable) { + drop_unfixable_sstables(); + } else { + regular_compaction::on_end_of_compaction(); + } + } + + virtual void stop_sstable_writer(compaction_writer* writer) override { + if (_options.drop_unfixable && _failed_to_fix_sstable && writer) { + finish_new_sstable(writer); + } else { + regular_compaction::stop_sstable_writer(writer); + } + } + + friend mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables); }; -mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors) { - return make_mutation_reader(std::move(rd), scrub_mode, validation_errors); +mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables) { + return make_mutation_reader(std::move(rd), scrub_mode, validation_errors, failed_to_fix_sstable, drop_unfixable_sstables); } class resharding_compaction final : public compaction { diff --git a/compaction/compaction.hh b/compaction/compaction.hh index fecc233911..7a0d1dacb3 100644 --- a/compaction/compaction.hh +++ b/compaction/compaction.hh @@ -138,6 +138,6 @@ std::unordered_set get_fully_expired_sstables(const compaction_group_view& table_s, const std::vector& compacting, gc_clock::time_point gc_before); // For tests, can drop after we virtualize sstables. -mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors); +mutation_reader make_scrubbing_reader(mutation_reader rd, compaction_type_options::scrub::mode scrub_mode, uint64_t& validation_errors, bool& failed_to_fix_sstable, compaction_type_options::scrub::drop_unfixable_sstables drop_unfixable_sstables); } diff --git a/compaction/compaction_descriptor.hh b/compaction/compaction_descriptor.hh index 455ebbf1a8..a3e569cc7f 100644 --- a/compaction/compaction_descriptor.hh +++ b/compaction/compaction_descriptor.hh @@ -74,6 +74,11 @@ public: // Should invalid sstables be moved into quarantine. // Only applies to validate-mode. quarantine_invalid_sstables quarantine_sstables = quarantine_invalid_sstables::yes; + + using drop_unfixable_sstables = bool_class; + // Drop sstables that cannot be fixed. + // Only applies to segregate-mode. + drop_unfixable_sstables drop_unfixable = drop_unfixable_sstables::no; }; struct reshard { }; @@ -113,8 +118,8 @@ public: return compaction_type_options(upgrade{}); } - static compaction_type_options make_scrub(scrub::mode mode, scrub::quarantine_invalid_sstables quarantine_sstables = scrub::quarantine_invalid_sstables::yes) { - return compaction_type_options(scrub{.operation_mode = mode, .quarantine_sstables = quarantine_sstables}); + static compaction_type_options make_scrub(scrub::mode mode, scrub::quarantine_invalid_sstables quarantine_sstables = scrub::quarantine_invalid_sstables::yes, scrub::drop_unfixable_sstables drop_unfixable_sstables = scrub::drop_unfixable_sstables::no) { + return compaction_type_options(scrub{.operation_mode = mode, .quarantine_sstables = quarantine_sstables, .drop_unfixable = drop_unfixable_sstables}); } static compaction_type_options make_split(mutation_writer::classify_by_token_group classifier) { diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 0d3d947a10..44457849d0 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -2313,7 +2313,7 @@ future compaction_manager::perform_sst } owned_ranges_ptr owned_ranges_ptr = {}; sstring option_desc = fmt::format("mode: {};\nquarantine_mode: {}\n", opts.operation_mode, opts.quarantine_operation_mode); - co_return co_await rewrite_sstables(t, compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] -> future> { + co_return co_await rewrite_sstables(t, compaction_type_options::make_scrub(scrub_mode, opts.quarantine_sstables, opts.drop_unfixable), std::move(owned_ranges_ptr), [&t, opts] -> future> { auto all_sstables = co_await get_all_sstables(t); std::vector sstables = all_sstables | std::views::filter([&opts] (const sstables::shared_sstable& sst) { diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 6e09cb5a9a..d5034b6cdb 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3281,8 +3281,10 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) { std::list> segregated_fragment_streams; uint64_t validation_errors = 0; + bool failed_to_fix_sstable = false; mutation_writer::segregate_by_partition( - make_scrubbing_reader(make_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)), compaction::compaction_type_options::scrub::mode::segregate, validation_errors), + make_scrubbing_reader(make_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)), compaction::compaction_type_options::scrub::mode::segregate, + validation_errors, failed_to_fix_sstable, compaction::compaction_type_options::scrub::drop_unfixable_sstables::no), mutation_writer::segregate_config{100000}, [&schema, &segregated_fragment_streams] (mutation_reader rd) { return async([&schema, &segregated_fragment_streams, rd = std::move(rd)] () mutable { @@ -3422,8 +3424,9 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end - at EOS uint64_t validation_errors = 0; + bool failed_to_fix_sstable = false; auto r = assert_that(make_scrubbing_reader(make_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)), - compaction::compaction_type_options::scrub::mode::skip, validation_errors)); + compaction::compaction_type_options::scrub::mode::skip, validation_errors, failed_to_fix_sstable, compaction::compaction_type_options::scrub::drop_unfixable_sstables::no)); for (const auto& mf : scrubbed_fragments) { testlog.info("Expecting {}", mutation_fragment_v2::printer(*schema, mf)); r.produces(*schema, mf); diff --git a/tools/scylla-nodetool.cc b/tools/scylla-nodetool.cc index 6faaa5d9db..6a488a7625 100644 --- a/tools/scylla-nodetool.cc +++ b/tools/scylla-nodetool.cc @@ -2158,19 +2158,30 @@ void scrub_operation(scylla_rest_client& client, const bpo::variables_map& vm) { } else { keyspaces.push_back(std::move(keyspace)); } - if (vm.contains("skip-corrupted") && vm.contains("mode")) { + + const bool has_skip_corrupted = vm.contains("skip-corrupted"); + const bool has_mode = vm.contains("mode"); + const bool has_drop_unfixable = vm.contains("drop-unfixable-sstables"); + + const sstring mode = has_mode ? vm["mode"].as() : ""; + + if (has_skip_corrupted && has_mode) { throw std::invalid_argument("cannot use --skip-corrupted when --mode is used"); } + if (has_drop_unfixable && (!has_mode || mode != "SEGREGATE")) { + throw std::invalid_argument("--drop-unfixable-sstables is only valid with --mode=SEGREGATE"); + } + std::unordered_map params; if (!tables.empty()) { params["cf"] = fmt::to_string(fmt::join(tables.begin(), tables.end(), ",")); } - if (vm.contains("mode")) { - params["scrub_mode"] = vm["mode"].as(); - } else if (vm.contains("skip-corrupted")) { + if (has_mode) { + params["scrub_mode"] = mode; + } else if (has_skip_corrupted) { params["scrub_mode"] = "SKIP"; } @@ -2182,6 +2193,10 @@ void scrub_operation(scylla_rest_client& client, const bpo::variables_map& vm) { params["disable_snapshot"] = "true"; } + if (has_drop_unfixable) { + params["drop_unfixable_sstables"] = "true"; + } + std::vector statuses; for (const auto& keyspace : keyspaces) { statuses.push_back(api::scrub_status(client.get(format("/storage_service/keyspace_scrub/{}", keyspace), params).GetInt64())); @@ -4401,6 +4416,7 @@ For more information, see: {} typed_option<>("no-validate,n", "Do not validate columns using column validator (unused)"), typed_option<>("reinsert-overflowed-ttl,r", "Rewrites rows with overflowed expiration date (unused)"), typed_option("jobs,j", "The number of sstables to be scrubbed concurrently (unused)"), + typed_option<>("drop-unfixable-sstables", "Drop unfixed sstables instead of aborting the entire scrub (only with --mode=SEGREGATE)"), }, { typed_option("keyspace", "The keyspace to scrub", 1),