diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 579ee7a2fc..d4d40e9ebb 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -1933,7 +1933,11 @@ static future scrub_sstables_validate_mode(sstables::compacti using scrub = sstables::compaction_type_options::scrub; if (validation_errors != 0 && descriptor.options.as().quarantine_sstables == scrub::quarantine_invalid_sstables::yes) { for (auto& sst : descriptor.sstables) { - co_await sst->change_state(sstables::sstable_state::quarantine); + try { + co_await sst->change_state(sstables::sstable_state::quarantine); + } catch (...) { + clogger.error("Moving {} to quarantine failed due to {}, continuing.", sst->get_filename(), std::current_exception()); + } } } diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index dbb7bc989a..1d564efede 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1839,8 +1839,21 @@ future compaction_manager::perform_sst if (!gh) { co_return compaction_stats_opt{}; } - // All sstables must be included, even the ones being compacted, such that everything in table is validated. - auto all_sstables = get_all_sstables(t); + + // Collect and register all sstables as compacting while compaction is disabled, to avoid a race condition where + // regular compaction runs in between and picks the same files. + std::vector all_sstables; + compacting_sstable_registration compacting(*this, get_compaction_state(&t)); + co_await run_with_compaction_disabled(t, [&all_sstables, &compacting, &t] () -> future<> { + // All sstables must be included. + all_sstables = get_all_sstables(t); + compacting.register_compacting(all_sstables); + return make_ready_future<>(); + }); + if (all_sstables.empty()) { + co_return compaction_stats_opt{}; + } + co_return co_await perform_compaction(throw_if_stopping::no, info, &t, info.id, std::move(all_sstables), quarantine_sstables); } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 3ead1edca2..2630a04ade 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3425,6 +3425,57 @@ SEASTAR_TEST_CASE(scrubbed_sstable_removal_test) { }); } +// Test to verify that `scrub --validate` is not affected by a concurrent regular compaction +SEASTAR_TEST_CASE(compact_uncompressed_sstable_during_scrub_validate_test) { +#ifndef SCYLLA_ENABLE_ERROR_INJECTION + fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n"); + return make_ready_future(); +#endif + return test_env::do_with_async([] (test_env& env) { + auto s = schema_builder("unlinked_sstable_scrub_test", "t1") + .with_column("pk", utf8_type, column_kind::partition_key) + .with_column("ck", utf8_type, column_kind::clustering_key) + .with_column("v", utf8_type) + .set_compressor_params(compression_parameters::no_compression()) + .build(); + auto cf = env.make_table_for_tests(s); + auto close_cf = deferred_stop(cf); + cf->disable_auto_compaction().get(); + + // Add 2 sstables to the column family + api::timestamp_type timestamp = api::min_timestamp; + for (int i = 0; i < 2; i++) { + auto mut = mutation(s, tests::generate_partition_key(s)); + mut.partition().apply_insert(*s, tests::generate_clustering_key(s), timestamp++); + auto sst = make_sstable_containing(env.make_sstable(s), {std::move(mut)}); + cf->add_sstable_and_update_cache(std::move(sst)).get(); + } + + // Start a scrub on the table; Use an injector to pause the scrub after it has collected the sstables to be scrubbed. + utils::get_local_injector().enable("sstable_validate/pause"); + sstables::compaction_type_options::scrub opts = {}; + opts.operation_mode = sstables::compaction_type_options::scrub::mode::validate; + auto scrub_task = cf->get_compaction_manager().perform_sstable_scrub(cf.as_table_state(), opts, {}); + + // When the scrub is paused, compact the two sstables in the table; this should not affect the scrub + cf->get_compaction_manager().perform_major_compaction(cf.as_table_state(), {}).get(); + + // Now resume the scrub and ensure it completes without error + utils::get_local_injector().receive_message("sstable_validate/pause"); + BOOST_REQUIRE_EQUAL(scrub_task.get().value().validation_errors, 0); + + // Test the reverse case : start a compaction and pause it, then start a scrub --validate + utils::get_local_injector().enable("major_compaction_wait"); + auto compaction_task = cf->get_compaction_manager().perform_major_compaction(cf.as_table_state(), {}); + // Perform scrub --validate while compaction is in progress + scrub_task = cf->get_compaction_manager().perform_sstable_scrub(cf.as_table_state(), opts, {}); + // Resume compaction and ensure that it doesn't interfere with the scrub + utils::get_local_injector().receive_message("major_compaction_wait"); + BOOST_REQUIRE_EQUAL(scrub_task.get().value().validation_errors, 0); + compaction_task.get(); + }); +} + SEASTAR_TEST_CASE(sstable_run_based_compaction_test) { return test_env::do_with_async([] (test_env& env) { auto builder = schema_builder("tests", "sstable_run_based_compaction_test")