diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index a97d71e781..0d9e3021d8 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -641,12 +641,17 @@ class flat_multi_range_mutation_reader : public flat_mutation_reader::impl { } public: - flat_multi_range_mutation_reader(schema_ptr s, mutation_source source, Generator generator, - const query::partition_slice& slice, const io_priority_class& pc, - tracing::trace_state_ptr trace_state) + flat_multi_range_mutation_reader( + schema_ptr s, + mutation_source source, + const dht::partition_range& first_range, + Generator generator, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state) : impl(s) , _generator(std::move(generator)) - , _reader(source.make_reader(s, *(*_generator)(), slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes)) + , _reader(source.make_reader(s, first_range, slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes)) { } @@ -723,7 +728,52 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa return source.make_reader(std::move(s), ranges.front(), slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr); } else { return make_flat_mutation_reader>(std::move(s), std::move(source), - adapter(ranges.cbegin(), ranges.cend()), slice, pc, std::move(trace_state)); + ranges.front(), adapter(std::next(ranges.cbegin()), ranges.cend()), slice, pc, std::move(trace_state)); + } +} + +flat_mutation_reader +make_flat_multi_range_reader( + schema_ptr s, + mutation_source source, + std::function()> generator, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) { + class adapter { + std::function()> _generator; + std::unique_ptr _previous; + std::unique_ptr _current; + + public: + explicit adapter(std::function()> generator) + : _generator(std::move(generator)) + , _previous(std::make_unique(dht::partition_range::make_singular({dht::token{}, partition_key::make_empty()}))) + , _current(std::make_unique(dht::partition_range::make_singular({dht::token{}, partition_key::make_empty()}))) { + } + const dht::partition_range* operator()() { + std::swap(_current, _previous); + if (auto next = _generator()) { + *_current = std::move(*next); + return _current.get(); + } else { + return nullptr; + } + } + }; + + auto adapted_generator = adapter(std::move(generator)); + auto* first_range = adapted_generator(); + if (!first_range) { + if (fwd_mr) { + return make_flat_mutation_reader(std::move(s), std::move(source), slice, pc, std::move(trace_state)); + } else { + return make_empty_flat_reader(std::move(s)); + } + } else { + return make_flat_mutation_reader>(std::move(s), std::move(source), + *first_range, std::move(adapted_generator), slice, pc, std::move(trace_state)); } } diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index e6e729494c..fbe5589e8e 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -616,6 +616,16 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa tracing::trace_state_ptr trace_state = nullptr, flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes); +flat_mutation_reader +make_flat_multi_range_reader( + schema_ptr s, + mutation_source source, + std::function()> generator, + const query::partition_slice& slice, + const io_priority_class& pc = default_priority_class(), + tracing::trace_state_ptr trace_state = nullptr, + flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes); + flat_mutation_reader make_flat_mutation_reader_from_fragments(schema_ptr, std::deque);