compaction: implement date tiered compaction strategy options

Now date tiered compaction strategy will take into account the
strategy options which are defined in the schema.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit eaa6e281a2)
This commit is contained in:
Raphael S. Carvalho
2016-07-18 22:54:34 -03:00
committed by Pekka Enberg
parent af7c0f6433
commit 789fb0db97

View File

@@ -46,26 +46,97 @@
#include <algorithm>
#include <vector>
#include <iterator>
#include <boost/algorithm/string/case_conv.hpp>
#include "sstables.hh"
#include "compaction.hh"
#include "timestamp.hh"
#include "cql3/statements/property_definitions.hh"
static constexpr double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365;
static constexpr int64_t DEFAULT_BASE_TIME_SECONDS = 60;
struct duration_conversor {
// Convert given duration to TargetDuration and return value as timestamp.
template <typename TargetDuration, typename SourceDuration>
static api::timestamp_type convert(SourceDuration d) {
return std::chrono::duration_cast<TargetDuration>(d).count();
}
// Convert given duration to duration that is represented by the string
// target_duration, and return value as timestamp.
template <typename SourceDuration>
static api::timestamp_type convert(const sstring& target_duration, SourceDuration d) {
if (target_duration == "HOURS") {
return convert<std::chrono::hours>(d);
} else if (target_duration == "MICROSECONDS") {
return convert<std::chrono::microseconds>(d);
} else if (target_duration == "MILLISECONDS") {
return convert<std::chrono::milliseconds>(d);
} else if (target_duration == "MINUTES") {
return convert<std::chrono::minutes>(d);
} else if (target_duration == "NANOSECONDS") {
return convert<std::chrono::nanoseconds>(d);
} else if (target_duration == "SECONDS") {
return convert<std::chrono::seconds>(d);
} else {
throw std::runtime_error(sprint("target duration %s is not available", target_duration));
}
}
};
class date_tiered_compaction_strategy_options {
const sstring DEFAULT_TIMESTAMP_RESOLUTION = "MICROSECONDS";
const sstring TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
const sstring MAX_SSTABLE_AGE_KEY = "max_sstable_age_days";
const sstring BASE_TIME_KEY = "base_time_seconds";
api::timestamp_type max_sstable_age;
api::timestamp_type base_time;
public:
date_tiered_compaction_strategy_options(const std::map<sstring, sstring>& options) {
using namespace cql3::statements;
auto tmp_value = get_value(options, TIMESTAMP_RESOLUTION_KEY);
auto target_unit = tmp_value ? tmp_value.value() : DEFAULT_TIMESTAMP_RESOLUTION;
boost::to_upper(target_unit);
tmp_value = get_value(options, MAX_SSTABLE_AGE_KEY);
auto fractional_days = property_definitions::to_double(MAX_SSTABLE_AGE_KEY, tmp_value, DEFAULT_MAX_SSTABLE_AGE_DAYS);
int64_t max_sstable_age_in_hours = std::lround(fractional_days * 24);
max_sstable_age = duration_conversor::convert(target_unit, std::chrono::hours(max_sstable_age_in_hours));
tmp_value = get_value(options, BASE_TIME_KEY);
auto base_time_seconds = property_definitions::to_long(BASE_TIME_KEY, tmp_value, DEFAULT_BASE_TIME_SECONDS);
base_time = duration_conversor::convert(target_unit, std::chrono::seconds(base_time_seconds));
}
date_tiered_compaction_strategy_options() {
auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24);
max_sstable_age = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::hours(max_sstable_age_in_hours)).count();
base_time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS)).count();
}
private:
static std::experimental::optional<sstring> get_value(const std::map<sstring, sstring>& options, const sstring& name) {
auto it = options.find(name);
if (it == options.end()) {
return std::experimental::nullopt;
}
return it->second;
}
friend class date_tiered_manifest;
};
class date_tiered_manifest {
static logging::logger logger;
// TODO: implement date_tiered_compaction_strategy_options.
db_clock::duration _max_sstable_age;
db_clock::duration _base_time;
date_tiered_compaction_strategy_options _options;
public:
date_tiered_manifest() = delete;
date_tiered_manifest(const std::map<sstring, sstring>& options) {
auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24);
_max_sstable_age = std::chrono::duration_cast<db_clock::duration>(std::chrono::hours(max_sstable_age_in_hours));
_base_time = std::chrono::duration_cast<db_clock::duration>(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS));
date_tiered_manifest(const std::map<sstring, sstring>& options)
: _options(options)
{
// FIXME: implement option to disable tombstone compaction.
#if 0
if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
@@ -119,8 +190,8 @@ public:
for (auto& entry : *cf.get_sstables()) {
sstables.push_back(entry);
}
auto candidates = filter_old_sstables(sstables, _max_sstable_age, now);
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now);
auto candidates = filter_old_sstables(sstables, _options.max_sstable_age, now);
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _options.base_time, base, now);
for (auto& bucket : buckets) {
if (bucket.size() >= size_t(cf.schema()->min_compaction_threshold())) {
@@ -161,11 +232,11 @@ private:
get_compaction_candidates(column_family& cf, std::vector<sstables::shared_sstable> candidate_sstables, int64_t now, int base) {
int min_threshold = cf.schema()->min_compaction_threshold();
int max_threshold = cf.schema()->max_compaction_threshold();
auto candidates = filter_old_sstables(candidate_sstables, _max_sstable_age, now);
auto candidates = filter_old_sstables(candidate_sstables, _options.max_sstable_age, now);
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now);
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _options.base_time, base, now);
return newest_bucket(buckets, min_threshold, max_threshold, now, _base_time);
return newest_bucket(buckets, min_threshold, max_threshold, now, _options.base_time);
}
/**
@@ -186,12 +257,11 @@ private:
* @return a list of sstables with the oldest sstables excluded
*/
static std::vector<sstables::shared_sstable>
filter_old_sstables(std::vector<sstables::shared_sstable> sstables, db_clock::duration max_sstable_age, int64_t now) {
int64_t max_sstable_age_count = std::chrono::duration_cast<std::chrono::microseconds>(max_sstable_age).count();
if (max_sstable_age_count == 0) {
filter_old_sstables(std::vector<sstables::shared_sstable> sstables, api::timestamp_type max_sstable_age, int64_t now) {
if (max_sstable_age == 0) {
return sstables;
}
int64_t cutoff = now - max_sstable_age_count;
int64_t cutoff = now - max_sstable_age;
sstables.erase(std::remove_if(sstables.begin(), sstables.end(), [cutoff] (auto& sst) {
return sst->get_stats_metadata().max_timestamp < cutoff;
@@ -275,14 +345,14 @@ private:
* Each bucket is also a list of files ordered from newest to oldest.
*/
std::vector<std::vector<sstables::shared_sstable>>
get_buckets(std::vector<std::pair<sstables::shared_sstable,int64_t>>&& files, db_clock::duration time_unit, int base, int64_t now) const {
get_buckets(std::vector<std::pair<sstables::shared_sstable,int64_t>>&& files, api::timestamp_type time_unit, int base, int64_t now) const {
// Sort files by age. Newest first.
std::sort(files.begin(), files.end(), [] (auto& i, auto& j) {
return i.second > j.second;
});
std::vector<std::vector<sstables::shared_sstable>> buckets;
auto target = get_initial_target(now, std::chrono::duration_cast<std::chrono::microseconds>(time_unit).count());
auto target = get_initial_target(now, time_unit);
auto it = files.begin();
while (it != files.end()) {
@@ -329,12 +399,12 @@ private:
*/
std::vector<sstables::shared_sstable>
newest_bucket(std::vector<std::vector<sstables::shared_sstable>>& buckets, int min_threshold, int max_threshold,
int64_t now, db_clock::duration base_time) {
int64_t now, api::timestamp_type base_time) {
// If the "incoming window" has at least minThreshold SSTables, choose that one.
// For any other bucket, at least 2 SSTables is enough.
// In any case, limit to maxThreshold SSTables.
target incoming_window = get_initial_target(now, std::chrono::duration_cast<std::chrono::microseconds>(base_time).count());
target incoming_window = get_initial_target(now, base_time);
for (auto& bucket : buckets) {
auto min_timestamp = bucket.front()->get_stats_metadata().min_timestamp;
if (bucket.size() >= size_t(min_threshold) ||