compaction_manager: fix indentation

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <82c6b93b24cbcc97f5eff3f91b05d4c1b415ecee.1462412927.git.raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2016-05-04 22:48:51 -03:00
committed by Avi Kivity
parent 3aefa4f1d2
commit b8277979ef

View File

@@ -120,89 +120,89 @@ void compaction_manager::task_start(column_family* cf, bool cleanup) {
_stats.pending_tasks++;
task->compaction_done = repeat([this, task] {
return seastar::with_gate(task->compaction_gate, [this, task] {
if (_stopped || task->stopping) {
_stats.pending_tasks--;
return make_ready_future<>();
return seastar::with_gate(task->compaction_gate, [this, task] {
if (_stopped || task->stopping) {
_stats.pending_tasks--;
return make_ready_future<>();
}
if (!task->cleanup && task->compacting_cf->pending_compactions() == 0) {
task->stopping = true;
_stats.pending_tasks--;
return make_ready_future<>();
}
column_family& cf = *task->compacting_cf;
std::vector<sstables::shared_sstable> candidates; // candidates for compaction
candidates.reserve(cf.sstables_count());
// Filter out sstables that are being compacted.
for (auto& entry : *cf.get_sstables()) {
auto& sst = entry.second;
if (!_compacting_sstables.count(sst)) {
candidates.push_back(sst);
}
if (!task->cleanup && task->compacting_cf->pending_compactions() == 0) {
}
sstables::compaction_descriptor descriptor;
// Created to erase sstables from _compacting_sstables after compaction finishes.
std::vector<sstables::shared_sstable> sstables_to_compact;
int weight = -1;
auto keep_track_of_compacting_sstables = [this, &sstables_to_compact, &descriptor] {
sstables_to_compact.reserve(descriptor.sstables.size());
for (auto& sst : descriptor.sstables) {
sstables_to_compact.push_back(sst);
_compacting_sstables.insert(sst);
}
};
future<> operation = make_ready_future<>();
if (task->cleanup) {
descriptor = sstables::compaction_descriptor(std::move(candidates));
keep_track_of_compacting_sstables();
operation = cf.cleanup_sstables(std::move(descriptor));
} else {
sstables::compaction_strategy cs = cf.get_compaction_strategy();
descriptor = cs.get_sstables_for_compaction(cf, std::move(candidates));
weight = trim_to_compact(&cf, descriptor);
if (!try_to_register_weight(&cf, weight)) {
// Refusing compaction job because of an ongoing compaction with same weight.
task->stopping = true;
_stats.pending_tasks--;
return make_ready_future<>();
}
column_family& cf = *task->compacting_cf;
std::vector<sstables::shared_sstable> candidates; // candidates for compaction
candidates.reserve(cf.sstables_count());
// Filter out sstables that are being compacted.
for (auto& entry : *cf.get_sstables()) {
auto& sst = entry.second;
if (!_compacting_sstables.count(sst)) {
candidates.push_back(sst);
}
}
sstables::compaction_descriptor descriptor;
// Created to erase sstables from _compacting_sstables after compaction finishes.
std::vector<sstables::shared_sstable> sstables_to_compact;
int weight = -1;
auto keep_track_of_compacting_sstables = [this, &sstables_to_compact, &descriptor] {
sstables_to_compact.reserve(descriptor.sstables.size());
for (auto& sst : descriptor.sstables) {
sstables_to_compact.push_back(sst);
_compacting_sstables.insert(sst);
}
};
future<> operation = make_ready_future<>();
if (task->cleanup) {
descriptor = sstables::compaction_descriptor(std::move(candidates));
keep_track_of_compacting_sstables();
operation = cf.cleanup_sstables(std::move(descriptor));
} else {
sstables::compaction_strategy cs = cf.get_compaction_strategy();
descriptor = cs.get_sstables_for_compaction(cf, std::move(candidates));
weight = trim_to_compact(&cf, descriptor);
if (!try_to_register_weight(&cf, weight)) {
// Refusing compaction job because of an ongoing compaction with same weight.
task->stopping = true;
_stats.pending_tasks--;
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
return make_ready_future<>();
}
keep_track_of_compacting_sstables();
cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}",
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
operation = cf.run_compaction(std::move(descriptor));
}
_stats.pending_tasks--;
_stats.active_tasks++;
return operation.then([this, task] {
_stats.completed_tasks++;
task->compaction_retry.reset();
if (task->cleanup) {
auto it = _cleanup_waiters.find(task->compacting_cf);
if (it != _cleanup_waiters.end()) {
// Notificate waiter of cleanup termination.
it->second.set_value();
_cleanup_waiters.erase(it);
}
}
return make_ready_future<>();
}).finally([this, task, weight, sstables_to_compact = std::move(sstables_to_compact)] {
// Remove compacted sstables from the set of compacting sstables.
for (auto& sst : sstables_to_compact) {
_compacting_sstables.erase(sst);
}
keep_track_of_compacting_sstables();
cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}",
descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name());
operation = cf.run_compaction(std::move(descriptor));
}
_stats.pending_tasks--;
_stats.active_tasks++;
return operation.then([this, task] {
_stats.completed_tasks++;
task->compaction_retry.reset();
if (task->cleanup) {
auto it = _cleanup_waiters.find(task->compacting_cf);
if (it != _cleanup_waiters.end()) {
// Notificate waiter of cleanup termination.
it->second.set_value();
_cleanup_waiters.erase(it);
}
if (weight != -1) {
deregister_weight(task->compacting_cf, weight);
}
_stats.active_tasks--;
});
}
return make_ready_future<>();
}).finally([this, task, weight, sstables_to_compact = std::move(sstables_to_compact)] {
// Remove compacted sstables from the set of compacting sstables.
for (auto& sst : sstables_to_compact) {
_compacting_sstables.erase(sst);
}
if (weight != -1) {
deregister_weight(task->compacting_cf, weight);
}
_stats.active_tasks--;
});
}).then_wrapped([this, task] (future<> f) {
bool retry = false;