diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 615194debe..3e1982d6ba 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -1513,6 +1513,46 @@ flat_mutation_reader make_generating_reader(schema_ptr s, reader_permit permit, return make_flat_mutation_reader(std::move(s), std::move(permit), std::move(get_next_fragment)); } + +/* + * This reader takes a get_next_fragment generator that produces mutation_fragment_opt which is returned by + * generating_reader. + */ +class generating_reader_v2 final : public flat_mutation_reader_v2::impl { + std::function ()> _get_next_fragment; +public: + generating_reader_v2(schema_ptr s, reader_permit permit, std::function ()> get_next_fragment) + : impl(std::move(s), std::move(permit)), _get_next_fragment(std::move(get_next_fragment)) + { } + virtual future<> fill_buffer() override { + return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] { + return _get_next_fragment().then([this] (mutation_fragment_v2_opt mopt) { + if (!mopt) { + _end_of_stream = true; + } else { + push_mutation_fragment(std::move(*mopt)); + } + }); + }); + } + virtual future<> next_partition() override { + return make_exception_future<>(make_backtraced_exception_ptr()); + } + virtual future<> fast_forward_to(const dht::partition_range&) override { + return make_exception_future<>(make_backtraced_exception_ptr()); + } + virtual future<> fast_forward_to(position_range) override { + return make_exception_future<>(make_backtraced_exception_ptr()); + } + virtual future<> close() noexcept override { + return make_ready_future<>(); + } +}; + +flat_mutation_reader_v2 make_generating_reader(schema_ptr s, reader_permit permit, std::function ()> get_next_fragment) { + return make_flat_mutation_reader_v2(std::move(s), std::move(permit), std::move(get_next_fragment)); +} + void flat_mutation_reader::do_upgrade_schema(const schema_ptr& s) { *this = transform(std::move(*this), schema_upgrader(s)); } diff --git a/flat_mutation_reader_v2.hh b/flat_mutation_reader_v2.hh index 42c3f4a51b..95d47744ef 100644 --- a/flat_mutation_reader_v2.hh +++ b/flat_mutation_reader_v2.hh @@ -815,6 +815,9 @@ make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque, const dht::partition_range& pr, const query::partition_slice& slice); +flat_mutation_reader_v2 +make_generating_reader(schema_ptr s, reader_permit permit, std::function ()> get_next_fragment); + /// A cosumer function that is passed a flat_mutation_reader to be consumed from /// and returns a future<> resolved when the reader is fully consumed, and closed. /// Note: the function assumes ownership of the reader and must close it in all cases.