diff --git a/sstables/date_tiered_compaction_strategy.hh b/sstables/date_tiered_compaction_strategy.hh index 5bd88b5c54..9ab9ba9cc9 100644 --- a/sstables/date_tiered_compaction_strategy.hh +++ b/sstables/date_tiered_compaction_strategy.hh @@ -46,26 +46,97 @@ #include #include #include +#include #include "sstables.hh" #include "compaction.hh" +#include "timestamp.hh" +#include "cql3/statements/property_definitions.hh" static constexpr double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365; static constexpr int64_t DEFAULT_BASE_TIME_SECONDS = 60; +struct duration_conversor { + // Convert given duration to TargetDuration and return value as timestamp. + template + static api::timestamp_type convert(SourceDuration d) { + return std::chrono::duration_cast(d).count(); + } + + // Convert given duration to duration that is represented by the string + // target_duration, and return value as timestamp. + template + static api::timestamp_type convert(const sstring& target_duration, SourceDuration d) { + if (target_duration == "HOURS") { + return convert(d); + } else if (target_duration == "MICROSECONDS") { + return convert(d); + } else if (target_duration == "MILLISECONDS") { + return convert(d); + } else if (target_duration == "MINUTES") { + return convert(d); + } else if (target_duration == "NANOSECONDS") { + return convert(d); + } else if (target_duration == "SECONDS") { + return convert(d); + } else { + throw std::runtime_error(sprint("target duration %s is not available", target_duration)); + } + } +}; + +class date_tiered_compaction_strategy_options { + const sstring DEFAULT_TIMESTAMP_RESOLUTION = "MICROSECONDS"; + const sstring TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution"; + const sstring MAX_SSTABLE_AGE_KEY = "max_sstable_age_days"; + const sstring BASE_TIME_KEY = "base_time_seconds"; + + api::timestamp_type max_sstable_age; + api::timestamp_type base_time; +public: + date_tiered_compaction_strategy_options(const std::map& options) { + using namespace cql3::statements; + + auto tmp_value = get_value(options, TIMESTAMP_RESOLUTION_KEY); + auto target_unit = tmp_value ? tmp_value.value() : DEFAULT_TIMESTAMP_RESOLUTION; + boost::to_upper(target_unit); + + tmp_value = get_value(options, MAX_SSTABLE_AGE_KEY); + auto fractional_days = property_definitions::to_double(MAX_SSTABLE_AGE_KEY, tmp_value, DEFAULT_MAX_SSTABLE_AGE_DAYS); + int64_t max_sstable_age_in_hours = std::lround(fractional_days * 24); + max_sstable_age = duration_conversor::convert(target_unit, std::chrono::hours(max_sstable_age_in_hours)); + + tmp_value = get_value(options, BASE_TIME_KEY); + auto base_time_seconds = property_definitions::to_long(BASE_TIME_KEY, tmp_value, DEFAULT_BASE_TIME_SECONDS); + base_time = duration_conversor::convert(target_unit, std::chrono::seconds(base_time_seconds)); + } + + date_tiered_compaction_strategy_options() { + auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24); + max_sstable_age = std::chrono::duration_cast(std::chrono::hours(max_sstable_age_in_hours)).count(); + base_time = std::chrono::duration_cast(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS)).count(); + } +private: + static std::experimental::optional get_value(const std::map& options, const sstring& name) { + auto it = options.find(name); + if (it == options.end()) { + return std::experimental::nullopt; + } + return it->second; + } + + friend class date_tiered_manifest; +}; + class date_tiered_manifest { static logging::logger logger; - // TODO: implement date_tiered_compaction_strategy_options. - db_clock::duration _max_sstable_age; - db_clock::duration _base_time; + date_tiered_compaction_strategy_options _options; public: date_tiered_manifest() = delete; - date_tiered_manifest(const std::map& options) { - auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24); - _max_sstable_age = std::chrono::duration_cast(std::chrono::hours(max_sstable_age_in_hours)); - _base_time = std::chrono::duration_cast(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS)); - + date_tiered_manifest(const std::map& options) + : _options(options) + { // FIXME: implement option to disable tombstone compaction. #if 0 if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION)) @@ -119,8 +190,8 @@ public: for (auto& entry : *cf.get_sstables()) { sstables.push_back(entry); } - auto candidates = filter_old_sstables(sstables, _max_sstable_age, now); - auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now); + auto candidates = filter_old_sstables(sstables, _options.max_sstable_age, now); + auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _options.base_time, base, now); for (auto& bucket : buckets) { if (bucket.size() >= size_t(cf.schema()->min_compaction_threshold())) { @@ -161,11 +232,11 @@ private: get_compaction_candidates(column_family& cf, std::vector candidate_sstables, int64_t now, int base) { int min_threshold = cf.schema()->min_compaction_threshold(); int max_threshold = cf.schema()->max_compaction_threshold(); - auto candidates = filter_old_sstables(candidate_sstables, _max_sstable_age, now); + auto candidates = filter_old_sstables(candidate_sstables, _options.max_sstable_age, now); - auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now); + auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _options.base_time, base, now); - return newest_bucket(buckets, min_threshold, max_threshold, now, _base_time); + return newest_bucket(buckets, min_threshold, max_threshold, now, _options.base_time); } /** @@ -186,12 +257,11 @@ private: * @return a list of sstables with the oldest sstables excluded */ static std::vector - filter_old_sstables(std::vector sstables, db_clock::duration max_sstable_age, int64_t now) { - int64_t max_sstable_age_count = std::chrono::duration_cast(max_sstable_age).count(); - if (max_sstable_age_count == 0) { + filter_old_sstables(std::vector sstables, api::timestamp_type max_sstable_age, int64_t now) { + if (max_sstable_age == 0) { return sstables; } - int64_t cutoff = now - max_sstable_age_count; + int64_t cutoff = now - max_sstable_age; sstables.erase(std::remove_if(sstables.begin(), sstables.end(), [cutoff] (auto& sst) { return sst->get_stats_metadata().max_timestamp < cutoff; @@ -275,14 +345,14 @@ private: * Each bucket is also a list of files ordered from newest to oldest. */ std::vector> - get_buckets(std::vector>&& files, db_clock::duration time_unit, int base, int64_t now) const { + get_buckets(std::vector>&& files, api::timestamp_type time_unit, int base, int64_t now) const { // Sort files by age. Newest first. std::sort(files.begin(), files.end(), [] (auto& i, auto& j) { return i.second > j.second; }); std::vector> buckets; - auto target = get_initial_target(now, std::chrono::duration_cast(time_unit).count()); + auto target = get_initial_target(now, time_unit); auto it = files.begin(); while (it != files.end()) { @@ -329,12 +399,12 @@ private: */ std::vector newest_bucket(std::vector>& buckets, int min_threshold, int max_threshold, - int64_t now, db_clock::duration base_time) { + int64_t now, api::timestamp_type base_time) { // If the "incoming window" has at least minThreshold SSTables, choose that one. // For any other bucket, at least 2 SSTables is enough. // In any case, limit to maxThreshold SSTables. - target incoming_window = get_initial_target(now, std::chrono::duration_cast(base_time).count()); + target incoming_window = get_initial_target(now, base_time); for (auto& bucket : buckets) { auto min_timestamp = bucket.front()->get_stats_metadata().min_timestamp; if (bucket.size() >= size_t(min_threshold) ||