mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-02 04:56:58 +00:00
compaction: Move run_off_strategy_compaction() into compaction manager
Compaction manager is calling back the table to run off-strategy compaction, but the logic clearly belongs to manager which should perform the operation independently and only call table to update its state with the result. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Message-Id: <20220315174504.107926-2-raphaelsc@scylladb.com>
This commit is contained in:
committed by
Botond Dénes
parent
1bae803a8b
commit
58e520ab1d
@@ -21,6 +21,7 @@
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include <cmath>
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
|
||||
static logging::logger cmlog("compaction_manager");
|
||||
using namespace std::chrono_literals;
|
||||
@@ -887,6 +888,85 @@ public:
|
||||
: task(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction")
|
||||
{}
|
||||
|
||||
future<> run_offstrategy_compaction(sstables::compaction_data& cdata) {
|
||||
// This procedure will reshape sstables in maintenance set until it's ready for
|
||||
// integration into main set.
|
||||
// It may require N reshape rounds before the set satisfies the strategy invariant.
|
||||
// This procedure also only updates maintenance set at the end, on success.
|
||||
// Otherwise, some overlapping could be introduced in the set after each reshape
|
||||
// round, progressively degrading read amplification until integration happens.
|
||||
// The drawback of this approach is the 2x space requirement as the old sstables
|
||||
// will only be deleted at the end. The impact of this space requirement is reduced
|
||||
// by the fact that off-strategy is serialized across all tables, meaning that the
|
||||
// actual requirement is the size of the largest table's maintenance set.
|
||||
|
||||
auto sem_unit = co_await seastar::get_units(_compaction_state.off_strategy_sem, 1);
|
||||
|
||||
replica::table& t = *_compacting_table;
|
||||
const auto& maintenance_sstables = t.maintenance_sstable_set();
|
||||
|
||||
cmlog.info("Starting off-strategy compaction for {}.{}, {} candidates were found",
|
||||
t.schema()->ks_name(), t.schema()->cf_name(), maintenance_sstables.all()->size());
|
||||
|
||||
const auto old_sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(*maintenance_sstables.all());
|
||||
std::vector<sstables::shared_sstable> reshape_candidates = old_sstables;
|
||||
std::vector<sstables::shared_sstable> sstables_to_remove;
|
||||
std::unordered_set<sstables::shared_sstable> new_unused_sstables;
|
||||
|
||||
auto cleanup_new_unused_sstables_on_failure = defer([&new_unused_sstables] {
|
||||
for (auto& sst : new_unused_sstables) {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
});
|
||||
|
||||
for (;;) {
|
||||
auto& iop = service::get_local_streaming_priority(); // run reshape in maintenance mode
|
||||
auto desc = t.get_compaction_strategy().get_reshaping_job(reshape_candidates, t.schema(), iop, sstables::reshape_mode::strict);
|
||||
if (desc.sstables.empty()) {
|
||||
// at this moment reshape_candidates contains a set of sstables ready for integration into main set
|
||||
co_await t.update_sstable_lists_on_off_strategy_completion(old_sstables, reshape_candidates);
|
||||
break;
|
||||
}
|
||||
|
||||
desc.creator = [this, &new_unused_sstables, &t] (shard_id dummy) {
|
||||
auto sst = t.make_sstable();
|
||||
new_unused_sstables.insert(sst);
|
||||
return sst;
|
||||
};
|
||||
auto input = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(desc.sstables);
|
||||
|
||||
auto ret = co_await sstables::compact_sstables(std::move(desc), cdata, t.as_table_state());
|
||||
|
||||
// update list of reshape candidates without input but with output added to it
|
||||
auto it = boost::remove_if(reshape_candidates, [&] (auto& s) { return input.contains(s); });
|
||||
reshape_candidates.erase(it, reshape_candidates.end());
|
||||
std::move(ret.new_sstables.begin(), ret.new_sstables.end(), std::back_inserter(reshape_candidates));
|
||||
|
||||
// If compaction strategy is unable to reshape input data in a single round, it may happen that a SSTable A
|
||||
// created in round 1 will be compacted in a next round producing SSTable B. As SSTable A is no longer needed,
|
||||
// it can be removed immediately. Let's remove all such SSTables immediately to reduce off-strategy space requirement.
|
||||
// Input SSTables from maintenance set can only be removed later, as SSTable sets are only updated on completion.
|
||||
auto can_remove_now = [&] (const sstables::shared_sstable& s) { return new_unused_sstables.contains(s); };
|
||||
for (auto&& sst : input) {
|
||||
if (can_remove_now(sst)) {
|
||||
co_await sst->unlink();
|
||||
new_unused_sstables.erase(std::move(sst));
|
||||
} else {
|
||||
sstables_to_remove.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup_new_unused_sstables_on_failure.cancel();
|
||||
// By marking input sstables for deletion instead, the ones which require view building will stay in the staging
|
||||
// directory until they're moved to the main dir when the time comes. Also, that allows view building to resume
|
||||
// on restart if there's a crash midway.
|
||||
for (auto& sst : sstables_to_remove) {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
|
||||
cmlog.info("Done with off-strategy compaction for {}.{}", t.schema()->ks_name(), t.schema()->cf_name());
|
||||
}
|
||||
protected:
|
||||
virtual future<> do_run() override {
|
||||
co_await coroutine::switch_to(_cm._maintenance_sg.cpu);
|
||||
@@ -904,7 +984,7 @@ protected:
|
||||
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await _compacting_table->run_offstrategy_compaction(_compaction_data);
|
||||
co_await run_offstrategy_compaction(_compaction_data);
|
||||
finish_compaction();
|
||||
co_return;
|
||||
} catch (...) {
|
||||
|
||||
@@ -68,6 +68,10 @@ private:
|
||||
// Raised by any function running under run_with_compaction_disabled();
|
||||
long compaction_disabled_counter = 0;
|
||||
|
||||
// This semaphore ensures that off-strategy compaction will be serialized for
|
||||
// a given table, protecting against candidates being picked more than once.
|
||||
seastar::named_semaphore off_strategy_sem = {1, named_semaphore_exception_factory{"off-strategy compaction"}};
|
||||
|
||||
bool compaction_disabled() const noexcept {
|
||||
return compaction_disabled_counter > 0;
|
||||
}
|
||||
|
||||
@@ -439,9 +439,6 @@ private:
|
||||
// sstables deleted by compaction in parallel, a race condition which could
|
||||
// easily result in failure.
|
||||
seastar::named_semaphore _sstable_deletion_sem = {1, named_semaphore_exception_factory{"sstable deletion"}};
|
||||
// This semaphore ensures that off-strategy compaction will be serialized and also
|
||||
// protects against candidates being picked more than once.
|
||||
seastar::named_semaphore _off_strategy_sem = {1, named_semaphore_exception_factory{"off-strategy compaction"}};
|
||||
// Ensures that concurrent updates to sstable set will work correctly
|
||||
seastar::named_semaphore _sstable_set_mutation_sem = {1, named_semaphore_exception_factory{"sstable set mutation"}};
|
||||
mutable row_cache _cache; // Cache covers only sstables.
|
||||
@@ -590,11 +587,14 @@ private:
|
||||
static int64_t calculate_shard_from_sstable_generation(int64_t sstable_generation) {
|
||||
return sstable_generation % smp::count;
|
||||
}
|
||||
|
||||
public:
|
||||
// This will update sstable lists on behalf of off-strategy compaction, where
|
||||
// input files will be removed from the maintenance set and output files will
|
||||
// be inserted into the main set.
|
||||
future<>
|
||||
update_sstable_lists_on_off_strategy_completion(const std::vector<sstables::shared_sstable>& old_maintenance_sstables,
|
||||
const std::vector<sstables::shared_sstable>& new_main_sstables);
|
||||
public:
|
||||
|
||||
// Rebuild sstable set, delete input sstables right away, and update row cache and statistics.
|
||||
void on_compaction_completion(sstables::compaction_completion_desc& desc);
|
||||
private:
|
||||
@@ -900,7 +900,6 @@ public:
|
||||
// a future<bool> that is resolved when offstrategy_compaction completes.
|
||||
// The future value is true iff offstrategy compaction was required.
|
||||
future<bool> perform_offstrategy_compaction();
|
||||
future<> run_offstrategy_compaction(sstables::compaction_data& info);
|
||||
void set_compaction_strategy(sstables::compaction_strategy_type strategy);
|
||||
const sstables::compaction_strategy& get_compaction_strategy() const {
|
||||
return _compaction_strategy;
|
||||
|
||||
@@ -1044,83 +1044,6 @@ future<bool> table::perform_offstrategy_compaction() {
|
||||
co_return true;
|
||||
}
|
||||
|
||||
future<> table::run_offstrategy_compaction(sstables::compaction_data& info) {
|
||||
// This procedure will reshape sstables in maintenance set until it's ready for
|
||||
// integration into main set.
|
||||
// It may require N reshape rounds before the set satisfies the strategy invariant.
|
||||
// This procedure also only updates maintenance set at the end, on success.
|
||||
// Otherwise, some overlapping could be introduced in the set after each reshape
|
||||
// round, progressively degrading read amplification until integration happens.
|
||||
// The drawback of this approach is the 2x space requirement as the old sstables
|
||||
// will only be deleted at the end. The impact of this space requirement is reduced
|
||||
// by the fact that off-strategy is serialized across all tables, meaning that the
|
||||
// actual requirement is the size of the largest table's maintenance set.
|
||||
|
||||
auto sem_unit = co_await seastar::get_units(_off_strategy_sem, 1);
|
||||
|
||||
tlogger.info("Starting off-strategy compaction for {}.{}, {} candidates were found",
|
||||
_schema->ks_name(), _schema->cf_name(), _maintenance_sstables->all()->size());
|
||||
|
||||
const auto old_sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_maintenance_sstables->all());
|
||||
std::vector<sstables::shared_sstable> reshape_candidates = old_sstables;
|
||||
std::vector<sstables::shared_sstable> sstables_to_remove;
|
||||
std::unordered_set<sstables::shared_sstable> new_unused_sstables;
|
||||
|
||||
auto cleanup_new_unused_sstables_on_failure = defer([&new_unused_sstables] {
|
||||
for (auto& sst : new_unused_sstables) {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
});
|
||||
|
||||
for (;;) {
|
||||
auto& iop = service::get_local_streaming_priority(); // run reshape in maintenance mode
|
||||
auto desc = _compaction_strategy.get_reshaping_job(reshape_candidates, _schema, iop, sstables::reshape_mode::strict);
|
||||
if (desc.sstables.empty()) {
|
||||
// at this moment reshape_candidates contains a set of sstables ready for integration into main set
|
||||
co_await update_sstable_lists_on_off_strategy_completion(old_sstables, reshape_candidates);
|
||||
break;
|
||||
}
|
||||
|
||||
desc.creator = [this, &new_unused_sstables] (shard_id dummy) {
|
||||
auto sst = make_sstable();
|
||||
new_unused_sstables.insert(sst);
|
||||
return sst;
|
||||
};
|
||||
auto input = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(desc.sstables);
|
||||
|
||||
auto ret = co_await sstables::compact_sstables(std::move(desc), info, as_table_state());
|
||||
|
||||
// update list of reshape candidates without input but with output added to it
|
||||
auto it = boost::remove_if(reshape_candidates, [&] (auto& s) { return input.contains(s); });
|
||||
reshape_candidates.erase(it, reshape_candidates.end());
|
||||
std::move(ret.new_sstables.begin(), ret.new_sstables.end(), std::back_inserter(reshape_candidates));
|
||||
|
||||
// If compaction strategy is unable to reshape input data in a single round, it may happen that a SSTable A
|
||||
// created in round 1 will be compacted in a next round producing SSTable B. As SSTable A is no longer needed,
|
||||
// it can be removed immediately. Let's remove all such SSTables immediately to reduce off-strategy space requirement.
|
||||
// Input SSTables from maintenance set can only be removed later, as SSTable sets are only updated on completion.
|
||||
auto can_remove_now = [&] (const sstables::shared_sstable& s) { return new_unused_sstables.contains(s); };
|
||||
for (auto&& sst : input) {
|
||||
if (can_remove_now(sst)) {
|
||||
co_await sst->unlink();
|
||||
new_unused_sstables.erase(std::move(sst));
|
||||
} else {
|
||||
sstables_to_remove.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup_new_unused_sstables_on_failure.cancel();
|
||||
// By marking input sstables for deletion instead, the ones which require view building will stay in the staging
|
||||
// directory until they're moved to the main dir when the time comes. Also, that allows view building to resume
|
||||
// on restart if there's a crash midway.
|
||||
for (auto& sst : sstables_to_remove) {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
|
||||
tlogger.info("Done with off-strategy compaction for {}.{}", _schema->ks_name(), _schema->cf_name());
|
||||
}
|
||||
|
||||
void table::set_compaction_strategy(sstables::compaction_strategy_type strategy) {
|
||||
tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), sstables::compaction_strategy::name(strategy));
|
||||
auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options());
|
||||
|
||||
@@ -4158,8 +4158,7 @@ SEASTAR_TEST_CASE(test_offstrategy_sstable_compaction) {
|
||||
auto sst = make_sstable_containing(sst_gen, {mut});
|
||||
cf->add_sstable_and_update_cache(std::move(sst), sstables::offstrategy::yes).get();
|
||||
}
|
||||
auto info = make_lw_shared<sstables::compaction_data>();
|
||||
cf->run_offstrategy_compaction(*info).get();
|
||||
BOOST_REQUIRE(cf->perform_offstrategy_compaction().get0());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user