From 18599f26faecbdd34f45157e9e893ca6032ff49e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 2 Nov 2021 08:23:16 +0200 Subject: [PATCH] mutation_writer/partition_based_splitting_writer: add memtable-based segregator The current method of segregating partitions doesn't work well for huge number of small partitions. For especially bad input, it can produce hundreds or even thousands of buckets. This patch adds a new segregator specialized for this use-case. This segregator uses a memtable to sort out-of-order partitions in-memory. When the memtable size reaches the provided max-memory limit, it is flushed to disk and a new empty one is created. In-order partitions bypass the sorting altogether and go to the fast-path bucket. The new method is not used yet, this will come in the next patch. --- .../partition_based_splitting_writer.cc | 118 ++++++++++++++++++ .../partition_based_splitting_writer.hh | 18 +++ 2 files changed, 136 insertions(+) diff --git a/mutation_writer/partition_based_splitting_writer.cc b/mutation_writer/partition_based_splitting_writer.cc index 4d57f3ea41..221d7f9f18 100644 --- a/mutation_writer/partition_based_splitting_writer.cc +++ b/mutation_writer/partition_based_splitting_writer.cc @@ -20,6 +20,7 @@ */ #include "mutation_writer/partition_based_splitting_writer.hh" +#include "memtable.hh" #include @@ -138,4 +139,121 @@ future<> segregate_by_partition(flat_mutation_reader producer, unsigned max_buck } } +class partition_sorting_mutation_writer { + schema_ptr _schema; + reader_permit _permit; + reader_consumer _consumer; + const io_priority_class& _pc; + size_t _max_memory; + bucket_writer _bucket_writer; + std::optional _last_bucket_key; + lw_shared_ptr _memtable; + future<> _background_memtable_flush = make_ready_future<>(); + std::optional _current_mut; + size_t _current_mut_size = 0; + +private: + void init_current_mut(dht::decorated_key key, tombstone tomb) { + _current_mut_size = key.external_memory_usage(); + _current_mut.emplace(_schema, std::move(key)); + _current_mut->partition().apply(tomb); + } + + future<> flush_memtable() { + co_await _consumer(_memtable->make_flush_reader(_schema, _permit, _pc)); + _memtable = make_lw_shared(_schema); + } + + future<> maybe_flush_memtable() { + if (_memtable->occupancy().total_space() + _current_mut_size > _max_memory) { + if (_current_mut) { + _memtable->apply(*_current_mut); + init_current_mut(_current_mut->decorated_key(), _current_mut->partition().partition_tombstone()); + } + return flush_memtable(); + } + return make_ready_future<>(); + } + + future<> write(mutation_fragment mf) { + if (!_current_mut) { + return _bucket_writer.consume(std::move(mf)); + } + _current_mut_size += mf.memory_usage(); + _current_mut->apply(mf); + return maybe_flush_memtable(); + } + +public: + partition_sorting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer consumer, const segregate_config& cfg) + : _schema(std::move(schema)) + , _permit(std::move(permit)) + , _consumer(std::move(consumer)) + , _pc(cfg.pc) + , _max_memory(cfg.max_memory) + , _bucket_writer(_schema, _permit, _consumer) + , _memtable(make_lw_shared(_schema)) + { } + + future<> consume(partition_start ps) { + // Fast path: in-order partitions go directly to the output sstable + if (!_last_bucket_key || dht::ring_position_tri_compare(*_schema, *_last_bucket_key, ps.key()) < 0) { + _last_bucket_key = ps.key(); + return _bucket_writer.consume(mutation_fragment(*_schema, _permit, std::move(ps))); + } + init_current_mut(ps.key(), ps.partition_tombstone()); + return make_ready_future<>(); + } + + future<> consume(static_row&& sr) { + return write(mutation_fragment(*_schema, _permit, std::move(sr))); + } + + future<> consume(clustering_row&& cr) { + return write(mutation_fragment(*_schema, _permit, std::move(cr))); + } + + future<> consume(range_tombstone&& rt) { + return write(mutation_fragment(*_schema, _permit, std::move(rt))); + } + + future<> consume(partition_end&& pe) { + if (!_current_mut) { + return _bucket_writer.consume(mutation_fragment(*_schema, _permit, std::move(pe))); + } + _memtable->apply(*_current_mut); + _current_mut.reset(); + _current_mut_size = 0; + return maybe_flush_memtable(); + } + + void consume_end_of_stream() { + _bucket_writer.consume_end_of_stream(); + if (!_memtable->empty()) { + _background_memtable_flush = flush_memtable(); + } + } + void abort(std::exception_ptr ep) { + _bucket_writer.abort(std::move(ep)); + } + future<> close() noexcept { + return when_all_succeed(_bucket_writer.close(), std::move(_background_memtable_flush)).discard_result(); + } +}; + +future<> segregate_by_partition(flat_mutation_reader producer, segregate_config cfg, reader_consumer consumer) { + auto schema = producer.schema(); + auto permit = producer.permit(); + try { + return feed_writer(std::move(producer), + partition_sorting_mutation_writer(std::move(schema), std::move(permit), std::move(consumer), std::move(cfg))); + } catch (...) { + return producer.close().then([ex = std::current_exception()] () mutable { + return make_exception_future<>(std::move(ex)); + }); + } +} + + + } // namespace mutation_writer diff --git a/mutation_writer/partition_based_splitting_writer.hh b/mutation_writer/partition_based_splitting_writer.hh index 8fa5d0d67d..db7dbae355 100644 --- a/mutation_writer/partition_based_splitting_writer.hh +++ b/mutation_writer/partition_based_splitting_writer.hh @@ -40,4 +40,22 @@ namespace mutation_writer { // created. future<> segregate_by_partition(flat_mutation_reader producer, unsigned max_buckets, reader_consumer consumer); +struct segregate_config { + // For flushing the memtable which does the in-memory segregation (sorting) + // part. + const io_priority_class& pc; + // Maximum amount of memory to be used by the in-memory segregation + // (sorting) structures. Partitions can be split across partitions + size_t max_memory; +}; + +// Given a producer that may contain partitions in the wrong order, or even +// contain partitions multiple times, separate them such that each output +// stream keeps the partition ordering guarantee. In other words, repair +// a stream that violates the ordering requirements by splitting it into output +// streams that honor it. +// This is useful for scrub compaction to split sstables containing out-of-order +// and/or duplicate partitions into sstables that honor the partition ordering. +future<> segregate_by_partition(flat_mutation_reader producer, segregate_config cfg, reader_consumer consumer); + } // namespace mutation_writer