diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index d502cf1892..1204a2eaf3 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -411,6 +411,56 @@ future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition }); } +// Creates a stream which is like r but with transformation applied to the elements. +template +GCC6_CONCEPT( + requires StreamedMutationTranformer() +) +flat_mutation_reader transform(flat_mutation_reader r, T t) { + class transforming_reader : public flat_mutation_reader::impl { + flat_mutation_reader _reader; + T _t; + struct consumer { + transforming_reader* _owner; + stop_iteration operator()(mutation_fragment&& mf) { + _owner->push_mutation_fragment(_owner->_t(std::move(mf))); + return stop_iteration(_owner->is_buffer_full()); + } + }; + public: + transforming_reader(flat_mutation_reader&& r, T&& t) + : impl(t(r.schema())) + , _reader(std::move(r)) + , _t(std::move(t)) + {} + virtual future<> fill_buffer() override { + if (_end_of_stream) { + return make_ready_future<>(); + } + return _reader.consume_pausable(consumer{this}).then([this] { + if (_reader.is_end_of_stream() && _reader.is_buffer_empty()) { + _end_of_stream = true; + } + }); + } + virtual void next_partition() override { + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + _reader.next_partition(); + } + } + virtual future<> fast_forward_to(const dht::partition_range& pr) override { + clear_buffer(); + _end_of_stream = false; + return _reader.fast_forward_to(pr); + } + virtual future<> fast_forward_to(position_range pr) override { + throw std::bad_function_call(); + } + }; + return make_flat_mutation_reader(std::move(r), std::move(t)); +} + flat_mutation_reader flat_mutation_reader_from_mutation_reader(schema_ptr, mutation_reader&&, streamed_mutation::forwarding); flat_mutation_reader make_forwardable(flat_mutation_reader m);