diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 2c999d9d0e..db1e8737b6 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -42,6 +42,7 @@ #include #include #include +#include #include #include @@ -59,6 +60,7 @@ #include "leveled_manifest.hh" #include "db/system_keyspace.hh" #include "db/query_context.hh" +#include "service/storage_service.hh" namespace sstables { @@ -87,11 +89,26 @@ static api::timestamp_type get_max_purgeable_timestamp(schema_ptr schema, return timestamp; } +static bool belongs_to_current_node(const dht::token& t, const std::vector>& sorted_owned_ranges) { + auto low = std::lower_bound(sorted_owned_ranges.begin(), sorted_owned_ranges.end(), t, + [] (const range& a, const dht::token b) { + // check that range a is before token b. + return a.after(b, dht::token_comparator()); + }); + + if (low != sorted_owned_ranges.end()) { + const range& r = *low; + return r.contains(t, dht::token_comparator()); + } + + return false; +} + // compact_sstables compacts the given list of sstables creating one // (currently) or more (in the future) new sstables. The new sstables // are created using the "sstable_creator" object passed by the caller. -future<> compact_sstables(std::vector sstables, - column_family& cf, std::function creator, uint64_t max_sstable_size, uint32_t sstable_level) { +future<> compact_sstables(std::vector sstables, column_family& cf, std::function creator, + uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup) { std::vector<::mutation_reader> readers; uint64_t estimated_partitions = 0; auto ancestors = make_lw_shared>(); @@ -146,7 +163,7 @@ future<> compact_sstables(std::vector sstables, stats->sstables = sstables.size(); stats->ks = schema->ks_name(); stats->cf = schema->cf_name(); - logger.info("Compacting {}", sstable_logger_msg); + logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg); class compacting_reader final : public ::mutation_reader::impl { private: @@ -154,12 +171,17 @@ future<> compact_sstables(std::vector sstables, ::mutation_reader _reader; std::vector _not_compacted_sstables; gc_clock::time_point _now; + std::vector> _sorted_owned_ranges; + bool _cleanup; public: - compacting_reader(schema_ptr schema, std::vector<::mutation_reader> readers, std::vector not_compacted_sstables) + compacting_reader(schema_ptr schema, std::vector<::mutation_reader> readers, std::vector not_compacted_sstables, + std::vector> sorted_owned_ranges, bool cleanup) : _schema(std::move(schema)) , _reader(make_combined_reader(std::move(readers))) , _not_compacted_sstables(std::move(not_compacted_sstables)) , _now(gc_clock::now()) + , _sorted_owned_ranges(std::move(sorted_owned_ranges)) + , _cleanup(cleanup) { } virtual future operator()() override { @@ -167,6 +189,9 @@ future<> compact_sstables(std::vector sstables, if (!bool(m)) { return make_ready_future(std::move(m)); } + if (_cleanup && !belongs_to_current_node(m->token(), _sorted_owned_ranges)) { + return operator()(); + } auto max_purgeable = get_max_purgeable_timestamp(_schema, _not_compacted_sstables, m->decorated_key()); m->partition().compact_for_compaction(*_schema, max_purgeable, _now); if (!m->partition().empty()) { @@ -176,7 +201,24 @@ future<> compact_sstables(std::vector sstables, }); } }; - auto reader = make_mutation_reader(schema, std::move(readers), std::move(not_compacted_sstables)); + std::vector> owned_ranges; + if (cleanup) { + owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name()); + // sort owned ranges + std::sort(owned_ranges.begin(), owned_ranges.end(), [](range& a, range& b) { + if (!a.start()) { + return true; + } + if (!b.start()) { + return false; + } + const dht::token& a_start = a.start()->value(); + const dht::token& b_start = b.start()->value(); + return a_start < b_start; + }); + } + auto reader = make_mutation_reader(schema, std::move(readers), std::move(not_compacted_sstables), + std::move(owned_ranges), cleanup); auto start_time = std::chrono::steady_clock::now(); @@ -268,7 +310,7 @@ future<> compact_sstables(std::vector sstables, if (ex.size()) { throw std::runtime_error(ex); } - }).then([start_time, stats] { + }).then([start_time, stats, cleanup] { double ratio = double(stats->end_size) / double(stats->start_size); auto end_time = std::chrono::steady_clock::now(); // time taken by compaction in seconds. @@ -285,8 +327,9 @@ future<> compact_sstables(std::vector sstables, // - add support to merge summary (message: Partition merge counts were {%s}.). // - there is no easy way, currently, to know the exact number of total partitions. // By the time being, using estimated key count. - logger.info("Compacted {} sstables to [{}]. {} bytes to {} (~{}% of original) in {}ms = {}MB/s. " \ + logger.info("{} {} sstables to [{}]. {} bytes to {} (~{}% of original) in {}ms = {}MB/s. " \ "~{} total partitions merged to {}.", + (!cleanup) ? "Compacted" : "Cleaned", stats->sstables, new_sstables_msg, stats->start_size, stats->end_size, (int) (ratio * 100), std::chrono::duration_cast(duration).count(), throughput, stats->total_partitions, stats->total_keys_written); @@ -297,6 +340,11 @@ future<> compact_sstables(std::vector sstables, return make_ready_future<>(); } + // Skip code that updates compaction history if running on behalf of a cleanup job. + if (cleanup) { + return make_ready_future<>(); + } + auto compacted_at = std::chrono::duration_cast(end_time.time_since_epoch()).count(); // FIXME: add support to merged_rows. merged_rows is a histogram that diff --git a/sstables/compaction.hh b/sstables/compaction.hh index ac62386ad6..5280641c40 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -63,9 +63,12 @@ namespace sstables { // Example: It's okay for the size of a new sstable to go beyond max_sstable_size // when writing its last partition. // sstable_level will be level of the sstable(s) to be created by this function. + // If cleanup is true, mutation that doesn't belong to current node will be + // cleaned up, log messages will inform the user that compact_sstables runs for + // cleaning operation, and compaction history will not be updated. future<> compact_sstables(std::vector sstables, column_family& cf, std::function creator, - uint64_t max_sstable_size, uint32_t sstable_level); + uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup = false); // Return the most interesting bucket applying the size-tiered strategy. // NOTE: currently used for purposes of testing. May also be used by leveled compaction strategy.