compaction: add functions to register and deregister compacting sstables

Reviewed-by: Nadav Har'El <nyh@scylladb.com>
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2016-06-23 16:31:29 -03:00
parent 4d6dce8ec9
commit da6a2b429d
2 changed files with 21 additions and 13 deletions

View File

@@ -124,6 +124,19 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const c
return candidates;
}
void compaction_manager::register_compacting_sstables(const std::vector<sstables::shared_sstable>& sstables) {
for (auto& sst : sstables) {
_compacting_sstables.insert(sst);
}
}
void compaction_manager::deregister_compacting_sstables(const std::vector<sstables::shared_sstable>& sstables) {
// Remove compacted sstables from the set of compacting sstables.
for (auto& sst : sstables) {
_compacting_sstables.erase(sst);
}
}
lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_family* cf, bool cleanup) {
// NOTE: Compaction code runs in parallel to the rest of the system.
// When it's time to shutdown, we need to prevent any new compaction
@@ -148,18 +161,12 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
// Created to erase sstables from _compacting_sstables after compaction finishes.
std::vector<sstables::shared_sstable> sstables_to_compact;
int weight = -1;
auto keep_track_of_compacting_sstables = [this, &sstables_to_compact, &descriptor] {
sstables_to_compact.reserve(descriptor.sstables.size());
for (auto& sst : descriptor.sstables) {
sstables_to_compact.push_back(sst);
_compacting_sstables.insert(sst);
}
};
future<> operation = make_ready_future<>();
if (task->cleanup) {
descriptor = sstables::compaction_descriptor(std::move(candidates));
keep_track_of_compacting_sstables();
sstables_to_compact = descriptor.sstables;
register_compacting_sstables(sstables_to_compact);
operation = cf.cleanup_sstables(std::move(descriptor));
} else {
sstables::compaction_strategy cs = cf.get_compaction_strategy();
@@ -173,7 +180,8 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
keep_track_of_compacting_sstables();
sstables_to_compact = descriptor.sstables;
register_compacting_sstables(sstables_to_compact);
cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}",
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
operation = cf.run_compaction(std::move(descriptor));
@@ -186,10 +194,7 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
task->compaction_retry.reset();
return make_ready_future<>();
}).then_wrapped([this, task, weight, sstables_to_compact = std::move(sstables_to_compact)] (future<> f) {
// Remove compacted sstables from the set of compacting sstables.
for (auto& sst : sstables_to_compact) {
_compacting_sstables.erase(sst);
}
deregister_compacting_sstables(sstables_to_compact);
if (weight != -1) {
deregister_weight(task->compacting_cf, weight);
}

View File

@@ -93,6 +93,9 @@ private:
// Get candidates for compaction strategy, which are all sstables but the ones being compacted.
std::vector<sstables::shared_sstable> get_candidates(const column_family& cf);
void register_compacting_sstables(const std::vector<sstables::shared_sstable>& sstables);
void deregister_compacting_sstables(const std::vector<sstables::shared_sstable>& sstables);
public:
compaction_manager();
~compaction_manager();