flat_mutation_reader: add v2 variant of make_generating_reader()

This commit is contained in:
Botond Dénes
2022-01-28 13:13:23 +02:00
parent bbf8e26a3a
commit 7a119080ee
2 changed files with 43 additions and 0 deletions

View File

@@ -1513,6 +1513,46 @@ flat_mutation_reader make_generating_reader(schema_ptr s, reader_permit permit,
return make_flat_mutation_reader<generating_reader>(std::move(s), std::move(permit), std::move(get_next_fragment));
}
/*
* This reader takes a get_next_fragment generator that produces mutation_fragment_opt which is returned by
* generating_reader.
*/
class generating_reader_v2 final : public flat_mutation_reader_v2::impl {
std::function<future<mutation_fragment_v2_opt> ()> _get_next_fragment;
public:
generating_reader_v2(schema_ptr s, reader_permit permit, std::function<future<mutation_fragment_v2_opt> ()> get_next_fragment)
: impl(std::move(s), std::move(permit)), _get_next_fragment(std::move(get_next_fragment))
{ }
virtual future<> fill_buffer() override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return _get_next_fragment().then([this] (mutation_fragment_v2_opt mopt) {
if (!mopt) {
_end_of_stream = true;
} else {
push_mutation_fragment(std::move(*mopt));
}
});
});
}
virtual future<> next_partition() override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(const dht::partition_range&) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(position_range) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
return make_ready_future<>();
}
};
flat_mutation_reader_v2 make_generating_reader(schema_ptr s, reader_permit permit, std::function<future<mutation_fragment_v2_opt> ()> get_next_fragment) {
return make_flat_mutation_reader_v2<generating_reader_v2>(std::move(s), std::move(permit), std::move(get_next_fragment));
}
void flat_mutation_reader::do_upgrade_schema(const schema_ptr& s) {
*this = transform(std::move(*this), schema_upgrader(s));
}

View File

@@ -815,6 +815,9 @@ make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque<m
flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque<mutation_fragment_v2>, const dht::partition_range& pr, const query::partition_slice& slice);
flat_mutation_reader_v2
make_generating_reader(schema_ptr s, reader_permit permit, std::function<future<mutation_fragment_v2_opt> ()> get_next_fragment);
/// A cosumer function that is passed a flat_mutation_reader to be consumed from
/// and returns a future<> resolved when the reader is fully consumed, and closed.
/// Note: the function assumes ownership of the reader and must close it in all cases.