diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index d4884b049d..31be4cb196 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -710,6 +710,9 @@ protected: future<> compaction_manager::run_custom_job(compaction_group_view& t, compaction_type type, const char* desc, noncopyable_function(compaction_data&, compaction_progress_monitor&)> job, tasks::task_info info, throw_if_stopping do_throw_if_stopping) { auto gh = start_compaction(t); if (!gh) { + if (is_disabled()) { + co_return co_await coroutine::return_exception_ptr(make_disabled_exception(t)); + } co_return; } diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 1876269721..a5cd584a58 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -170,8 +170,12 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: on_internal_error(tasks::tmlogger, format("No compaction group found for table {}.{}", table.schema()->ks_name(), table.schema()->cf_name())); } auto& t = cg->view_for_unrepaired_data(); + // Resharding is mandatory for SSTable loading — if it cannot proceed (e.g. due to + // out-of-space prevention disabling compaction), we must fail loudly rather than + // silently skipping and leaving SSTables orphaned. + auto& cm = table.get_compaction_manager(); co_await coroutine::parallel_for_each(buckets, [&] (std::vector& sstlist) mutable { - return table.get_compaction_manager().run_custom_job(t, compaction_type::Reshard, "Reshard compaction", [&] (compaction_data& info, compaction_progress_monitor& progress_monitor) -> future<> { + return cm.run_custom_job(t, compaction_type::Reshard, "Reshard compaction", [&] (compaction_data& info, compaction_progress_monitor& progress_monitor) -> future<> { auto erm = table.get_effective_replication_map(); // keep alive around compaction. compaction_descriptor desc(sstlist); @@ -184,7 +188,7 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: // input sstables are moved, to guarantee their resources are released once we're done // resharding them. co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result(); - }, parent_info, throw_if_stopping::no); + }, parent_info, throw_if_stopping::yes); }); }