From 01273098201635458f295c730c745f4e5d40c9d5 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 20 Mar 2017 18:45:00 -0300 Subject: [PATCH] sstables: extend compaction for new resharding Extends compaction for new resharding algorithm. Not wired yet. New resharding will compact shared sstable(s) and create one sstable for each owner. It's up to the caller to open these new unshared sstables at their respective column families. This new approach will save a lot of bandwidth because we'll no longer read the entire shared sstable #smp::count times. Signed-off-by: Raphael S. Carvalho --- sstables/compaction.cc | 98 +++++++++++++++++++++++++++++++++++++----- sstables/compaction.hh | 6 +++ 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 8512ebeaa8..1dd96ac8e0 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -198,6 +198,22 @@ protected: uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(_info->start_size) / _max_sstable_size))); return ceil(double(_estimated_partitions) / estimated_sstables); } + + void setup_new_sstable(shared_sstable& sst) { + _info->new_sstables.push_back(sst); + sst->get_metadata_collector().set_replay_position(_rp); + sst->get_metadata_collector().sstable_level(_sstable_level); + for (auto ancestor : _ancestors) { + sst->add_ancestor(ancestor); + } + } + + void finish_new_sstable(stdx::optional& writer, shared_sstable& sst) { + writer->consume_end_of_stream(); + writer = stdx::nullopt; + sst->open_data().get0(); + _info->end_size += sst->data_size(); + } public: compaction& operator=(const compaction&) = delete; compaction(const compaction&) = delete; @@ -283,12 +299,7 @@ private: virtual sstable_writer* select_sstable_writer(const dht::decorated_key& dk) { if (!_writer) { _sst = _creator(); - _info->new_sstables.push_back(_sst); - _sst->get_metadata_collector().set_replay_position(_rp); - _sst->get_metadata_collector().sstable_level(_sstable_level); - for (auto ancestor : _ancestors) { - _sst->add_ancestor(ancestor); - } + setup_new_sstable(_sst); auto&& priority = service::get_local_compaction_priority(); sstable_writer_config cfg; @@ -299,10 +310,7 @@ private: } virtual void stop_sstable_writer() { - _writer->consume_end_of_stream(); - _writer = stdx::nullopt; - _sst->open_data().get0(); - _info->end_size += _sst->data_size(); + finish_new_sstable(_writer, _sst); } virtual void finish_sstable_writer() { @@ -417,6 +425,66 @@ public: }; +class resharding_compaction final : public compaction { + std::vector>> _output_sstables; + shard_id _shard; // shard of current sstable writer + std::function _sstable_creator; +public: + resharding_compaction(std::vector sstables, column_family& cf, std::function creator, + uint64_t max_sstable_size, uint32_t sstable_level) + : compaction(cf, std::move(sstables), {}, max_sstable_size, sstable_level) + , _output_sstables(smp::count) + , _sstable_creator(std::move(creator)) + { + } + + void report_start(const sstring& formatted_msg) const override { + logger.info("Resharding {}", formatted_msg); + } + + void report_finish(const sstring& formatted_msg, std::chrono::time_point ended_at) const override { + logger.info("Resharded {}", formatted_msg); + } + + std::function max_purgeable_func() override { + return [] (const dht::decorated_key& dk) { + return api::min_timestamp; + }; + } + + sstable_writer* select_sstable_writer(const dht::decorated_key& dk) override { + _shard = dht::shard_of(dk.token()); + auto& sst = _output_sstables[_shard].first; + auto& writer = _output_sstables[_shard].second; + + if (!writer) { + sst = _sstable_creator(_shard); + setup_new_sstable(sst); + + sstable_writer_config cfg; + cfg.max_sstable_size = _max_sstable_size; + auto&& priority = service::get_local_compaction_priority(); + writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, priority, _shard)); + } + return &*writer; + } + + void stop_sstable_writer() override { + auto& sst = _output_sstables[_shard].first; + auto& writer = _output_sstables[_shard].second; + + finish_new_sstable(writer, sst); + } + + void finish_sstable_writer() override { + for (auto& p : _output_sstables) { + if (p.second) { + finish_new_sstable(p.second, p.first); + } + } + } +}; + future> compaction::run(std::unique_ptr c) { return seastar::async([c = std::move(c)] () mutable { auto reader = c->setup(); @@ -459,6 +527,16 @@ compact_sstables(std::vector sstables, column_family& cf, std::f return compaction::run(std::move(c)); } +future> +reshard_sstables(std::vector sstables, column_family& cf, std::function creator, + uint64_t max_sstable_size, uint32_t sstable_level) { + if (sstables.empty()) { + throw std::runtime_error(sprint("Called resharding with empty set on behalf of {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name())); + } + auto c = std::make_unique(std::move(sstables), cf, std::move(creator), max_sstable_size, sstable_level); + return compaction::run(std::move(c)); +} + std::vector get_fully_expired_sstables(column_family& cf, std::vector& compacting, int32_t gc_before) { logger.debug("Checking droppable sstables in {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name()); diff --git a/sstables/compaction.hh b/sstables/compaction.hh index d6ae41c25b..0ff3f631e1 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -104,6 +104,12 @@ namespace sstables { column_family& cf, std::function creator, uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup = false); + // Compacts a set of N shared sstables into M sstables. For every shard involved, + // i.e. which owns any of the sstables, a new unshared sstable is created. + future> reshard_sstables(std::vector sstables, + column_family& cf, std::function creator, + uint64_t max_sstable_size, uint32_t sstable_level); + // Return the most interesting bucket applying the size-tiered strategy. std::vector size_tiered_most_interesting_bucket(lw_shared_ptr candidates);