sstables: prepare compact_sstables to work with cleanup

Cleanup is about rewriting a sstable discarding any keys that
are irrelevant, i.e. keys that don't belong to current node.
Parameter cleanup was added to compact_sstables.
If set to true, irrelevant code such as the one that updates
compaction history will be skipped. Logic was also added to
discard irrelevant keys.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2016-01-05 15:35:40 -02:00
parent 5c674091dc
commit ed80ed82ef
2 changed files with 59 additions and 8 deletions

View File

@@ -42,6 +42,7 @@
#include <functional>
#include <utility>
#include <assert.h>
#include <algorithm>
#include <boost/range/algorithm.hpp>
#include <boost/range/adaptors.hpp>
@@ -59,6 +60,7 @@
#include "leveled_manifest.hh"
#include "db/system_keyspace.hh"
#include "db/query_context.hh"
#include "service/storage_service.hh"
namespace sstables {
@@ -87,11 +89,26 @@ static api::timestamp_type get_max_purgeable_timestamp(schema_ptr schema,
return timestamp;
}
static bool belongs_to_current_node(const dht::token& t, const std::vector<range<dht::token>>& sorted_owned_ranges) {
auto low = std::lower_bound(sorted_owned_ranges.begin(), sorted_owned_ranges.end(), t,
[] (const range<dht::token>& a, const dht::token b) {
// check that range a is before token b.
return a.after(b, dht::token_comparator());
});
if (low != sorted_owned_ranges.end()) {
const range<dht::token>& r = *low;
return r.contains(t, dht::token_comparator());
}
return false;
}
// compact_sstables compacts the given list of sstables creating one
// (currently) or more (in the future) new sstables. The new sstables
// are created using the "sstable_creator" object passed by the caller.
future<> compact_sstables(std::vector<shared_sstable> sstables,
column_family& cf, std::function<shared_sstable()> creator, uint64_t max_sstable_size, uint32_t sstable_level) {
future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::function<shared_sstable()> creator,
uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup) {
std::vector<::mutation_reader> readers;
uint64_t estimated_partitions = 0;
auto ancestors = make_lw_shared<std::vector<unsigned long>>();
@@ -146,7 +163,7 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
stats->sstables = sstables.size();
stats->ks = schema->ks_name();
stats->cf = schema->cf_name();
logger.info("Compacting {}", sstable_logger_msg);
logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg);
class compacting_reader final : public ::mutation_reader::impl {
private:
@@ -154,12 +171,17 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
::mutation_reader _reader;
std::vector<shared_sstable> _not_compacted_sstables;
gc_clock::time_point _now;
std::vector<range<dht::token>> _sorted_owned_ranges;
bool _cleanup;
public:
compacting_reader(schema_ptr schema, std::vector<::mutation_reader> readers, std::vector<shared_sstable> not_compacted_sstables)
compacting_reader(schema_ptr schema, std::vector<::mutation_reader> readers, std::vector<shared_sstable> not_compacted_sstables,
std::vector<range<dht::token>> sorted_owned_ranges, bool cleanup)
: _schema(std::move(schema))
, _reader(make_combined_reader(std::move(readers)))
, _not_compacted_sstables(std::move(not_compacted_sstables))
, _now(gc_clock::now())
, _sorted_owned_ranges(std::move(sorted_owned_ranges))
, _cleanup(cleanup)
{ }
virtual future<mutation_opt> operator()() override {
@@ -167,6 +189,9 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
if (!bool(m)) {
return make_ready_future<mutation_opt>(std::move(m));
}
if (_cleanup && !belongs_to_current_node(m->token(), _sorted_owned_ranges)) {
return operator()();
}
auto max_purgeable = get_max_purgeable_timestamp(_schema, _not_compacted_sstables, m->decorated_key());
m->partition().compact_for_compaction(*_schema, max_purgeable, _now);
if (!m->partition().empty()) {
@@ -176,7 +201,24 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
});
}
};
auto reader = make_mutation_reader<compacting_reader>(schema, std::move(readers), std::move(not_compacted_sstables));
std::vector<range<dht::token>> owned_ranges;
if (cleanup) {
owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name());
// sort owned ranges
std::sort(owned_ranges.begin(), owned_ranges.end(), [](range<dht::token>& a, range<dht::token>& b) {
if (!a.start()) {
return true;
}
if (!b.start()) {
return false;
}
const dht::token& a_start = a.start()->value();
const dht::token& b_start = b.start()->value();
return a_start < b_start;
});
}
auto reader = make_mutation_reader<compacting_reader>(schema, std::move(readers), std::move(not_compacted_sstables),
std::move(owned_ranges), cleanup);
auto start_time = std::chrono::steady_clock::now();
@@ -268,7 +310,7 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
if (ex.size()) {
throw std::runtime_error(ex);
}
}).then([start_time, stats] {
}).then([start_time, stats, cleanup] {
double ratio = double(stats->end_size) / double(stats->start_size);
auto end_time = std::chrono::steady_clock::now();
// time taken by compaction in seconds.
@@ -285,8 +327,9 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
// - add support to merge summary (message: Partition merge counts were {%s}.).
// - there is no easy way, currently, to know the exact number of total partitions.
// By the time being, using estimated key count.
logger.info("Compacted {} sstables to [{}]. {} bytes to {} (~{}% of original) in {}ms = {}MB/s. " \
logger.info("{} {} sstables to [{}]. {} bytes to {} (~{}% of original) in {}ms = {}MB/s. " \
"~{} total partitions merged to {}.",
(!cleanup) ? "Compacted" : "Cleaned",
stats->sstables, new_sstables_msg, stats->start_size, stats->end_size, (int) (ratio * 100),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), throughput,
stats->total_partitions, stats->total_keys_written);
@@ -297,6 +340,11 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
return make_ready_future<>();
}
// Skip code that updates compaction history if running on behalf of a cleanup job.
if (cleanup) {
return make_ready_future<>();
}
auto compacted_at = std::chrono::duration_cast<std::chrono::milliseconds>(end_time.time_since_epoch()).count();
// FIXME: add support to merged_rows. merged_rows is a histogram that

View File

@@ -63,9 +63,12 @@ namespace sstables {
// Example: It's okay for the size of a new sstable to go beyond max_sstable_size
// when writing its last partition.
// sstable_level will be level of the sstable(s) to be created by this function.
// If cleanup is true, mutation that doesn't belong to current node will be
// cleaned up, log messages will inform the user that compact_sstables runs for
// cleaning operation, and compaction history will not be updated.
future<> compact_sstables(std::vector<shared_sstable> sstables,
column_family& cf, std::function<shared_sstable()> creator,
uint64_t max_sstable_size, uint32_t sstable_level);
uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup = false);
// Return the most interesting bucket applying the size-tiered strategy.
// NOTE: currently used for purposes of testing. May also be used by leveled compaction strategy.