diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 8c992aa8b8..fee554e2b8 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -21,6 +21,7 @@ #include "utils/UUID_gen.hh" #include #include +#include static logging::logger cmlog("compaction_manager"); using namespace std::chrono_literals; @@ -887,6 +888,85 @@ public: : task(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction") {} + future<> run_offstrategy_compaction(sstables::compaction_data& cdata) { + // This procedure will reshape sstables in maintenance set until it's ready for + // integration into main set. + // It may require N reshape rounds before the set satisfies the strategy invariant. + // This procedure also only updates maintenance set at the end, on success. + // Otherwise, some overlapping could be introduced in the set after each reshape + // round, progressively degrading read amplification until integration happens. + // The drawback of this approach is the 2x space requirement as the old sstables + // will only be deleted at the end. The impact of this space requirement is reduced + // by the fact that off-strategy is serialized across all tables, meaning that the + // actual requirement is the size of the largest table's maintenance set. + + auto sem_unit = co_await seastar::get_units(_compaction_state.off_strategy_sem, 1); + + replica::table& t = *_compacting_table; + const auto& maintenance_sstables = t.maintenance_sstable_set(); + + cmlog.info("Starting off-strategy compaction for {}.{}, {} candidates were found", + t.schema()->ks_name(), t.schema()->cf_name(), maintenance_sstables.all()->size()); + + const auto old_sstables = boost::copy_range>(*maintenance_sstables.all()); + std::vector reshape_candidates = old_sstables; + std::vector sstables_to_remove; + std::unordered_set new_unused_sstables; + + auto cleanup_new_unused_sstables_on_failure = defer([&new_unused_sstables] { + for (auto& sst : new_unused_sstables) { + sst->mark_for_deletion(); + } + }); + + for (;;) { + auto& iop = service::get_local_streaming_priority(); // run reshape in maintenance mode + auto desc = t.get_compaction_strategy().get_reshaping_job(reshape_candidates, t.schema(), iop, sstables::reshape_mode::strict); + if (desc.sstables.empty()) { + // at this moment reshape_candidates contains a set of sstables ready for integration into main set + co_await t.update_sstable_lists_on_off_strategy_completion(old_sstables, reshape_candidates); + break; + } + + desc.creator = [this, &new_unused_sstables, &t] (shard_id dummy) { + auto sst = t.make_sstable(); + new_unused_sstables.insert(sst); + return sst; + }; + auto input = boost::copy_range>(desc.sstables); + + auto ret = co_await sstables::compact_sstables(std::move(desc), cdata, t.as_table_state()); + + // update list of reshape candidates without input but with output added to it + auto it = boost::remove_if(reshape_candidates, [&] (auto& s) { return input.contains(s); }); + reshape_candidates.erase(it, reshape_candidates.end()); + std::move(ret.new_sstables.begin(), ret.new_sstables.end(), std::back_inserter(reshape_candidates)); + + // If compaction strategy is unable to reshape input data in a single round, it may happen that a SSTable A + // created in round 1 will be compacted in a next round producing SSTable B. As SSTable A is no longer needed, + // it can be removed immediately. Let's remove all such SSTables immediately to reduce off-strategy space requirement. + // Input SSTables from maintenance set can only be removed later, as SSTable sets are only updated on completion. + auto can_remove_now = [&] (const sstables::shared_sstable& s) { return new_unused_sstables.contains(s); }; + for (auto&& sst : input) { + if (can_remove_now(sst)) { + co_await sst->unlink(); + new_unused_sstables.erase(std::move(sst)); + } else { + sstables_to_remove.push_back(std::move(sst)); + } + } + } + + cleanup_new_unused_sstables_on_failure.cancel(); + // By marking input sstables for deletion instead, the ones which require view building will stay in the staging + // directory until they're moved to the main dir when the time comes. Also, that allows view building to resume + // on restart if there's a crash midway. + for (auto& sst : sstables_to_remove) { + sst->mark_for_deletion(); + } + + cmlog.info("Done with off-strategy compaction for {}.{}", t.schema()->ks_name(), t.schema()->cf_name()); + } protected: virtual future<> do_run() override { co_await coroutine::switch_to(_cm._maintenance_sg.cpu); @@ -904,7 +984,7 @@ protected: std::exception_ptr ex; try { - co_await _compacting_table->run_offstrategy_compaction(_compaction_data); + co_await run_offstrategy_compaction(_compaction_data); finish_compaction(); co_return; } catch (...) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 4b20eb923c..3bfd8978ef 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -68,6 +68,10 @@ private: // Raised by any function running under run_with_compaction_disabled(); long compaction_disabled_counter = 0; + // This semaphore ensures that off-strategy compaction will be serialized for + // a given table, protecting against candidates being picked more than once. + seastar::named_semaphore off_strategy_sem = {1, named_semaphore_exception_factory{"off-strategy compaction"}}; + bool compaction_disabled() const noexcept { return compaction_disabled_counter > 0; } diff --git a/replica/database.hh b/replica/database.hh index 51550426c6..7002269d47 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -439,9 +439,6 @@ private: // sstables deleted by compaction in parallel, a race condition which could // easily result in failure. seastar::named_semaphore _sstable_deletion_sem = {1, named_semaphore_exception_factory{"sstable deletion"}}; - // This semaphore ensures that off-strategy compaction will be serialized and also - // protects against candidates being picked more than once. - seastar::named_semaphore _off_strategy_sem = {1, named_semaphore_exception_factory{"off-strategy compaction"}}; // Ensures that concurrent updates to sstable set will work correctly seastar::named_semaphore _sstable_set_mutation_sem = {1, named_semaphore_exception_factory{"sstable set mutation"}}; mutable row_cache _cache; // Cache covers only sstables. @@ -590,11 +587,14 @@ private: static int64_t calculate_shard_from_sstable_generation(int64_t sstable_generation) { return sstable_generation % smp::count; } - +public: + // This will update sstable lists on behalf of off-strategy compaction, where + // input files will be removed from the maintenance set and output files will + // be inserted into the main set. future<> update_sstable_lists_on_off_strategy_completion(const std::vector& old_maintenance_sstables, const std::vector& new_main_sstables); -public: + // Rebuild sstable set, delete input sstables right away, and update row cache and statistics. void on_compaction_completion(sstables::compaction_completion_desc& desc); private: @@ -900,7 +900,6 @@ public: // a future that is resolved when offstrategy_compaction completes. // The future value is true iff offstrategy compaction was required. future perform_offstrategy_compaction(); - future<> run_offstrategy_compaction(sstables::compaction_data& info); void set_compaction_strategy(sstables::compaction_strategy_type strategy); const sstables::compaction_strategy& get_compaction_strategy() const { return _compaction_strategy; diff --git a/replica/table.cc b/replica/table.cc index 187bab990d..7cc338fa12 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1044,83 +1044,6 @@ future table::perform_offstrategy_compaction() { co_return true; } -future<> table::run_offstrategy_compaction(sstables::compaction_data& info) { - // This procedure will reshape sstables in maintenance set until it's ready for - // integration into main set. - // It may require N reshape rounds before the set satisfies the strategy invariant. - // This procedure also only updates maintenance set at the end, on success. - // Otherwise, some overlapping could be introduced in the set after each reshape - // round, progressively degrading read amplification until integration happens. - // The drawback of this approach is the 2x space requirement as the old sstables - // will only be deleted at the end. The impact of this space requirement is reduced - // by the fact that off-strategy is serialized across all tables, meaning that the - // actual requirement is the size of the largest table's maintenance set. - - auto sem_unit = co_await seastar::get_units(_off_strategy_sem, 1); - - tlogger.info("Starting off-strategy compaction for {}.{}, {} candidates were found", - _schema->ks_name(), _schema->cf_name(), _maintenance_sstables->all()->size()); - - const auto old_sstables = boost::copy_range>(*_maintenance_sstables->all()); - std::vector reshape_candidates = old_sstables; - std::vector sstables_to_remove; - std::unordered_set new_unused_sstables; - - auto cleanup_new_unused_sstables_on_failure = defer([&new_unused_sstables] { - for (auto& sst : new_unused_sstables) { - sst->mark_for_deletion(); - } - }); - - for (;;) { - auto& iop = service::get_local_streaming_priority(); // run reshape in maintenance mode - auto desc = _compaction_strategy.get_reshaping_job(reshape_candidates, _schema, iop, sstables::reshape_mode::strict); - if (desc.sstables.empty()) { - // at this moment reshape_candidates contains a set of sstables ready for integration into main set - co_await update_sstable_lists_on_off_strategy_completion(old_sstables, reshape_candidates); - break; - } - - desc.creator = [this, &new_unused_sstables] (shard_id dummy) { - auto sst = make_sstable(); - new_unused_sstables.insert(sst); - return sst; - }; - auto input = boost::copy_range>(desc.sstables); - - auto ret = co_await sstables::compact_sstables(std::move(desc), info, as_table_state()); - - // update list of reshape candidates without input but with output added to it - auto it = boost::remove_if(reshape_candidates, [&] (auto& s) { return input.contains(s); }); - reshape_candidates.erase(it, reshape_candidates.end()); - std::move(ret.new_sstables.begin(), ret.new_sstables.end(), std::back_inserter(reshape_candidates)); - - // If compaction strategy is unable to reshape input data in a single round, it may happen that a SSTable A - // created in round 1 will be compacted in a next round producing SSTable B. As SSTable A is no longer needed, - // it can be removed immediately. Let's remove all such SSTables immediately to reduce off-strategy space requirement. - // Input SSTables from maintenance set can only be removed later, as SSTable sets are only updated on completion. - auto can_remove_now = [&] (const sstables::shared_sstable& s) { return new_unused_sstables.contains(s); }; - for (auto&& sst : input) { - if (can_remove_now(sst)) { - co_await sst->unlink(); - new_unused_sstables.erase(std::move(sst)); - } else { - sstables_to_remove.push_back(std::move(sst)); - } - } - } - - cleanup_new_unused_sstables_on_failure.cancel(); - // By marking input sstables for deletion instead, the ones which require view building will stay in the staging - // directory until they're moved to the main dir when the time comes. Also, that allows view building to resume - // on restart if there's a crash midway. - for (auto& sst : sstables_to_remove) { - sst->mark_for_deletion(); - } - - tlogger.info("Done with off-strategy compaction for {}.{}", _schema->ks_name(), _schema->cf_name()); -} - void table::set_compaction_strategy(sstables::compaction_strategy_type strategy) { tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), sstables::compaction_strategy::name(strategy)); auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options()); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 8e89f3597e..ba456c022a 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4158,8 +4158,7 @@ SEASTAR_TEST_CASE(test_offstrategy_sstable_compaction) { auto sst = make_sstable_containing(sst_gen, {mut}); cf->add_sstable_and_update_cache(std::move(sst), sstables::offstrategy::yes).get(); } - auto info = make_lw_shared(); - cf->run_offstrategy_compaction(*info).get(); + BOOST_REQUIRE(cf->perform_offstrategy_compaction().get0()); } }); }