Create transform for flat_mutation_reader.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Piotr Jastrzebski
2017-12-07 10:20:20 +01:00
parent a322268416
commit 8a275dfaeb

View File

@@ -411,6 +411,56 @@ future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition
});
}
// Creates a stream which is like r but with transformation applied to the elements.
template<typename T>
GCC6_CONCEPT(
requires StreamedMutationTranformer<T>()
)
flat_mutation_reader transform(flat_mutation_reader r, T t) {
class transforming_reader : public flat_mutation_reader::impl {
flat_mutation_reader _reader;
T _t;
struct consumer {
transforming_reader* _owner;
stop_iteration operator()(mutation_fragment&& mf) {
_owner->push_mutation_fragment(_owner->_t(std::move(mf)));
return stop_iteration(_owner->is_buffer_full());
}
};
public:
transforming_reader(flat_mutation_reader&& r, T&& t)
: impl(t(r.schema()))
, _reader(std::move(r))
, _t(std::move(t))
{}
virtual future<> fill_buffer() override {
if (_end_of_stream) {
return make_ready_future<>();
}
return _reader.consume_pausable(consumer{this}).then([this] {
if (_reader.is_end_of_stream() && _reader.is_buffer_empty()) {
_end_of_stream = true;
}
});
}
virtual void next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_reader.next_partition();
}
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
clear_buffer();
_end_of_stream = false;
return _reader.fast_forward_to(pr);
}
virtual future<> fast_forward_to(position_range pr) override {
throw std::bad_function_call();
}
};
return make_flat_mutation_reader<transforming_reader>(std::move(r), std::move(t));
}
flat_mutation_reader flat_mutation_reader_from_mutation_reader(schema_ptr, mutation_reader&&, streamed_mutation::forwarding);
flat_mutation_reader make_forwardable(flat_mutation_reader m);