diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 7a73d53556..95bf237c92 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -84,7 +84,6 @@ public: private: tracked_buffer _buffer; size_t _buffer_size = 0; - bool _consume_done = false; protected: size_t max_buffer_size_in_bytes = 8 * 1024; bool _end_of_stream = false; @@ -153,16 +152,20 @@ public: // Stops when consumer returns stop_iteration::yes or end of stream is reached. // Next call will start from the next mutation_fragment in the stream. future<> consume_pausable(Consumer consumer, db::timeout_clock::time_point timeout) { - _consume_done = false; - return do_until([this] { return (is_end_of_stream() && is_buffer_empty()) || _consume_done; }, - [this, consumer = std::move(consumer), timeout] () mutable { - if (is_buffer_empty()) { - return fill_buffer(timeout); - } + return do_with(std::move(consumer), [this, timeout] (Consumer& consumer) { + return repeat([this, &consumer, timeout] { + if (is_end_of_stream() && is_buffer_empty()) { + return make_ready_future(stop_iteration::yes); + } - _consume_done = consumer(pop_mutation_fragment()) == stop_iteration::yes; + if (is_buffer_empty()) { + return fill_buffer(timeout).then([] { + return make_ready_future(stop_iteration::no); + }); + } - return make_ready_future<>(); + return make_ready_future(consumer(pop_mutation_fragment())); + }); }); }