compaction_manager: collect all cleanup related logic in perform_cleanup()

Currently the call chain for a cleanup collection looks like this:
compaction_manager::perform_cleanup()
    compaction_manager::rewrite_sstables()
        table::cleanup_sstables()
            ...

`perform_cleanup()` is essentially empty, immediately deferring to
`rewrite_sstables()`. Cleanup related logic is scattered between the
latter two methods on the call chain. These methods however recently
started serving as generic methods for compactions that want to
rewrite each sstable one-by-one, collecting cleanup related ifs in
various places.
The reason is historic, we first had cleanup, then bolted others on top,
trying to share the underlying code as much as possible.

It is time this is cleaned up (pun intended). Make `perform_cleanup()`
the place where all cleanup related logic is, with the rest of the stack
made truly generic.
This commit is contained in:
Botond Dénes
2020-02-04 16:06:40 +02:00
parent b2dc5d4895
commit 8014c7124d
4 changed files with 52 additions and 49 deletions

View File

@@ -827,15 +827,12 @@ public:
// not a real compaction policy.
future<> compact_all_sstables();
// Compact all sstables provided in the vector.
// If descriptor.cleanup is set to true, compaction_sstables will run on behalf of a cleanup job,
// meaning that irrelevant keys will be discarded.
future<> compact_sstables(sstables::compaction_descriptor descriptor);
// Performs a cleanup on each sstable of this column family, excluding
// those ones that are irrelevant to this node or being compacted.
// Cleanup is about discarding keys that are no longer relevant for a
// given sstable, e.g. after node loses part of its token range because
// of a newly added node.
future<> cleanup_sstables(sstables::compaction_descriptor descriptor);
// Compact all sstables provided in the descriptor one-by-one.
//
// Will call `compact_sstables()` for each sstable. Use by compaction
// types such as cleanup or upgrade.
future<> rewrite_sstables(sstables::compaction_descriptor descriptor);
future<bool> snapshot_exists(sstring name);

View File

@@ -24,6 +24,7 @@
#include "compaction_backlog_manager.hh"
#include "sstables/sstables.hh"
#include "database.hh"
#include "service/storage_service.hh"
#include <seastar/core/metrics.hh>
#include "exceptions.hh"
#include <cmath>
@@ -585,10 +586,6 @@ inline bool compaction_manager::check_for_cleanup(column_family* cf) {
}
future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func get_func) {
if (options.type() == sstables::compaction_type::Cleanup && check_for_cleanup(cf)) {
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name()));
}
auto task = make_lw_shared<compaction_manager::task>();
task->compacting_cf = cf;
task->type = options.type();
@@ -616,7 +613,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable {
return cf.cleanup_sstables(std::move(descriptor));
return cf.rewrite_sstables(std::move(descriptor));
});
}).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
task->compaction_running = false;
@@ -643,8 +640,39 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
return task->compaction_done.get_future().then([task] {});
}
static bool needs_cleanup(const sstables::shared_sstable& sst,
const dht::token_range_vector& owned_ranges,
schema_ptr s) {
auto first = sst->get_first_partition_key();
auto last = sst->get_last_partition_key();
auto first_token = dht::global_partitioner().get_token(*s, first);
auto last_token = dht::global_partitioner().get_token(*s, last);
dht::token_range sst_token_range = dht::token_range::make(first_token, last_token);
// return true iff sst partition range isn't fully contained in any of the owned ranges.
for (auto& r : owned_ranges) {
if (r.contains(sst_token_range, dht::token_comparator())) {
return false;
}
}
return true;
}
future<> compaction_manager::perform_cleanup(column_family* cf) {
return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), std::bind(&compaction_manager::get_candidates, this, std::placeholders::_1));
if (check_for_cleanup(cf)) {
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name()));
}
return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), [this] (const table& table) {
auto schema = table.schema();
auto owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name());
auto sstables = std::vector<sstables::shared_sstable>{};
const auto candidates = table.candidates_for_compaction();
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&owned_ranges, schema] (const sstables::shared_sstable& sst) {
return owned_ranges.empty() || needs_cleanup(sst, owned_ranges, schema);
});
return sstables;
});
}
sstables::sstable::version_types get_highest_supported_format();

View File

@@ -169,6 +169,12 @@ public:
void submit(column_family* cf);
// Submit a column family to be cleaned up and wait for its termination.
//
// Performs a cleanup on each sstable of the column family, excluding
// those ones that are irrelevant to this node or being compacted.
// Cleanup is about discarding keys that are no longer relevant for a
// given sstable, e.g. after node loses part of its token range because
// of a newly added node.
future<> perform_cleanup(column_family* cf);
// Submit a column family to be upgraded and wait for its termination.

View File

@@ -1319,43 +1319,15 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) {
});
}
static bool needs_cleanup(const sstables::shared_sstable& sst,
const dht::token_range_vector& owned_ranges,
schema_ptr s) {
auto first = sst->get_first_partition_key();
auto last = sst->get_last_partition_key();
auto first_token = dht::global_partitioner().get_token(*s, first);
auto last_token = dht::global_partitioner().get_token(*s, last);
dht::token_range sst_token_range = dht::token_range::make(first_token, last_token);
// return true iff sst partition range isn't fully contained in any of the owned ranges.
for (auto& r : owned_ranges) {
if (r.contains(sst_token_range, dht::token_comparator())) {
return false;
}
}
return true;
}
future<> table::cleanup_sstables(sstables::compaction_descriptor descriptor) {
dht::token_range_vector r;
if (descriptor.options.type() == sstables::compaction_type::Cleanup) {
r = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
}
return do_with(std::move(descriptor.sstables), std::move(r), std::move(descriptor.release_exhausted),
[this, options = descriptor.options] (auto& sstables, auto& owned_ranges, auto& release_fn) {
return do_for_each(sstables, [this, &owned_ranges, &release_fn, options] (auto& sst) {
if (options.type() == sstables::compaction_type::Cleanup && !owned_ranges.empty() && !needs_cleanup(sst, owned_ranges, _schema)) {
return make_ready_future<>();
}
// this semaphore ensures that only one cleanup will run per shard.
future<> table::rewrite_sstables(sstables::compaction_descriptor descriptor) {
return do_with(std::move(descriptor.sstables), std::move(descriptor.release_exhausted),
[this, options = descriptor.options] (auto& sstables, auto& release_fn) {
return do_for_each(sstables, [this, &release_fn, options] (auto& sst) {
// this semaphore ensures that only one rewrite will run per shard.
// That's to prevent node from running out of space when almost all sstables
// need cleanup, so if sstables are cleaned in parallel, we may need almost
// need rewrite, so if sstables are rewritten in parallel, we may need almost
// twice the disk space used by those sstables.
static thread_local named_semaphore sem(1, named_semaphore_exception_factory{"cleanup sstables"});
static thread_local named_semaphore sem(1, named_semaphore_exception_factory{"rewrite sstables"});
return with_semaphore(sem, 1, [this, &sst, &release_fn, options] {
// release reference to sstables cleaned up, otherwise space usage from their data and index