diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 8512ebeaa8..1dd96ac8e0 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -198,6 +198,22 @@ protected: uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(_info->start_size) / _max_sstable_size))); return ceil(double(_estimated_partitions) / estimated_sstables); } + + void setup_new_sstable(shared_sstable& sst) { + _info->new_sstables.push_back(sst); + sst->get_metadata_collector().set_replay_position(_rp); + sst->get_metadata_collector().sstable_level(_sstable_level); + for (auto ancestor : _ancestors) { + sst->add_ancestor(ancestor); + } + } + + void finish_new_sstable(stdx::optional& writer, shared_sstable& sst) { + writer->consume_end_of_stream(); + writer = stdx::nullopt; + sst->open_data().get0(); + _info->end_size += sst->data_size(); + } public: compaction& operator=(const compaction&) = delete; compaction(const compaction&) = delete; @@ -283,12 +299,7 @@ private: virtual sstable_writer* select_sstable_writer(const dht::decorated_key& dk) { if (!_writer) { _sst = _creator(); - _info->new_sstables.push_back(_sst); - _sst->get_metadata_collector().set_replay_position(_rp); - _sst->get_metadata_collector().sstable_level(_sstable_level); - for (auto ancestor : _ancestors) { - _sst->add_ancestor(ancestor); - } + setup_new_sstable(_sst); auto&& priority = service::get_local_compaction_priority(); sstable_writer_config cfg; @@ -299,10 +310,7 @@ private: } virtual void stop_sstable_writer() { - _writer->consume_end_of_stream(); - _writer = stdx::nullopt; - _sst->open_data().get0(); - _info->end_size += _sst->data_size(); + finish_new_sstable(_writer, _sst); } virtual void finish_sstable_writer() { @@ -417,6 +425,66 @@ public: }; +class resharding_compaction final : public compaction { + std::vector>> _output_sstables; + shard_id _shard; // shard of current sstable writer + std::function _sstable_creator; +public: + resharding_compaction(std::vector sstables, column_family& cf, std::function creator, + uint64_t max_sstable_size, uint32_t sstable_level) + : compaction(cf, std::move(sstables), {}, max_sstable_size, sstable_level) + , _output_sstables(smp::count) + , _sstable_creator(std::move(creator)) + { + } + + void report_start(const sstring& formatted_msg) const override { + logger.info("Resharding {}", formatted_msg); + } + + void report_finish(const sstring& formatted_msg, std::chrono::time_point ended_at) const override { + logger.info("Resharded {}", formatted_msg); + } + + std::function max_purgeable_func() override { + return [] (const dht::decorated_key& dk) { + return api::min_timestamp; + }; + } + + sstable_writer* select_sstable_writer(const dht::decorated_key& dk) override { + _shard = dht::shard_of(dk.token()); + auto& sst = _output_sstables[_shard].first; + auto& writer = _output_sstables[_shard].second; + + if (!writer) { + sst = _sstable_creator(_shard); + setup_new_sstable(sst); + + sstable_writer_config cfg; + cfg.max_sstable_size = _max_sstable_size; + auto&& priority = service::get_local_compaction_priority(); + writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, priority, _shard)); + } + return &*writer; + } + + void stop_sstable_writer() override { + auto& sst = _output_sstables[_shard].first; + auto& writer = _output_sstables[_shard].second; + + finish_new_sstable(writer, sst); + } + + void finish_sstable_writer() override { + for (auto& p : _output_sstables) { + if (p.second) { + finish_new_sstable(p.second, p.first); + } + } + } +}; + future> compaction::run(std::unique_ptr c) { return seastar::async([c = std::move(c)] () mutable { auto reader = c->setup(); @@ -459,6 +527,16 @@ compact_sstables(std::vector sstables, column_family& cf, std::f return compaction::run(std::move(c)); } +future> +reshard_sstables(std::vector sstables, column_family& cf, std::function creator, + uint64_t max_sstable_size, uint32_t sstable_level) { + if (sstables.empty()) { + throw std::runtime_error(sprint("Called resharding with empty set on behalf of {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name())); + } + auto c = std::make_unique(std::move(sstables), cf, std::move(creator), max_sstable_size, sstable_level); + return compaction::run(std::move(c)); +} + std::vector get_fully_expired_sstables(column_family& cf, std::vector& compacting, int32_t gc_before) { logger.debug("Checking droppable sstables in {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name()); diff --git a/sstables/compaction.hh b/sstables/compaction.hh index d6ae41c25b..0ff3f631e1 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -104,6 +104,12 @@ namespace sstables { column_family& cf, std::function creator, uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup = false); + // Compacts a set of N shared sstables into M sstables. For every shard involved, + // i.e. which owns any of the sstables, a new unshared sstable is created. + future> reshard_sstables(std::vector sstables, + column_family& cf, std::function creator, + uint64_t max_sstable_size, uint32_t sstable_level); + // Return the most interesting bucket applying the size-tiered strategy. std::vector size_tiered_most_interesting_bucket(lw_shared_ptr candidates);