diff --git a/configure.py b/configure.py
index 790cd0825e..9b06ec097c 100755
--- a/configure.py
+++ b/configure.py
@@ -329,6 +329,7 @@ scylla_core = (['database.cc',
'mutation_partition_view.cc',
'mutation_partition_serializer.cc',
'mutation_reader.cc',
+ 'flat_mutation_reader.cc',
'mutation_query.cc',
'keys.cc',
'counters.cc',
diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc
new file mode 100644
index 0000000000..74fb92f3cb
--- /dev/null
+++ b/flat_mutation_reader.cc
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2017 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#include "flat_mutation_reader.hh"
+
+#include
+
+#include
+
+void flat_mutation_reader::impl::forward_buffer_to(schema_ptr& schema, const position_in_partition& pos) {
+ _buffer.erase(std::remove_if(_buffer.begin(), _buffer.end(), [this, &pos, &schema] (mutation_fragment& f) {
+ return !f.relevant_for_range_assuming_after(*schema, pos);
+ }), _buffer.end());
+
+ _buffer_size = boost::accumulate(_buffer | boost::adaptors::transformed(std::mem_fn(&mutation_fragment::memory_usage)), size_t(0));
+}
\ No newline at end of file
diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh
new file mode 100644
index 0000000000..1da00f359f
--- /dev/null
+++ b/flat_mutation_reader.hh
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2017 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include
+#include
+
+#include "dht/i_partitioner.hh"
+#include "position_in_partition.hh"
+#include "streamed_mutation.hh"
+#include "mutation_reader.hh"
+
+#include
+
+using seastar::future;
+
+GCC6_CONCEPT(
+ template
+ concept bool FlatMutationReaderConsumer() {
+ return requires(Consumer c, mutation_fragment mf) {
+ { c(std::move(mf)) } -> stop_iteration;
+ };
+ }
+)
+
+/*
+ * Allows iteration on mutations using mutation_fragments.
+ * It iterates over mutations one by one and for each mutation
+ * it returns:
+ * 1. partition_start mutation_fragment
+ * 2. static_row mutation_fragment if one exists
+ * 3. mutation_fragments for all clustering rows and range tombstones
+ * in clustering key order
+ * 4. partition_end mutation_fragment
+ * The best way to consume those mutation_fragments is to call
+ * flat_mutation_reader::consume with a consumer that receives the fragments.
+ */
+class flat_mutation_reader final {
+public:
+ class impl {
+ circular_buffer _buffer;
+ size_t _buffer_size = 0;
+ bool _consume_done = false;
+ protected:
+ static constexpr size_t max_buffer_size_in_bytes = 8 * 1024;
+ bool _end_of_stream = false;
+ protected:
+ template
+ void push_mutation_fragment(Args&&... args) {
+ _buffer.emplace_back(std::forward(args)...);
+ _buffer_size += _buffer.back().memory_usage();
+ }
+ void clear_buffer() {
+ _buffer.erase(_buffer.begin(), _buffer.end());
+ _buffer_size = 0;
+ }
+ void forward_buffer_to(schema_ptr& s, const position_in_partition& pos);
+ public:
+ virtual ~impl() {}
+ virtual future<> fill_buffer() = 0;
+ virtual void next_partition() = 0;
+
+ bool is_end_of_stream() const { return _end_of_stream; }
+ bool is_buffer_empty() const { return _buffer.empty(); }
+ bool is_buffer_full() const { return _buffer_size >= max_buffer_size_in_bytes; }
+
+ mutation_fragment pop_mutation_fragment() {
+ auto mf = std::move(_buffer.front());
+ _buffer.pop_front();
+ _buffer_size -= mf.memory_usage();
+ return mf;
+ }
+
+ future operator()() {
+ if (is_buffer_empty()) {
+ if (is_end_of_stream()) {
+ return make_ready_future();
+ }
+ return fill_buffer().then([this] { return operator()(); });
+ }
+ return make_ready_future(pop_mutation_fragment());
+ }
+
+ template
+ GCC6_CONCEPT(
+ requires FlatMutationReaderConsumer()
+ )
+ future<> consume_pausable(Consumer consumer) {
+ _consume_done = false;
+ return do_until([this] { return (is_end_of_stream() && is_buffer_empty()) || _consume_done; }, [this, consumer = std::move(consumer)] () mutable {
+ if (is_buffer_empty()) {
+ return fill_buffer();
+ }
+
+ _consume_done = consumer(pop_mutation_fragment()) == stop_iteration::yes;
+
+ return make_ready_future<>();
+ });
+ }
+
+ virtual future<> fast_forward_to(const dht::partition_range&) = 0;
+ virtual future<> fast_forward_to(position_range) = 0;
+ };
+private:
+ std::unique_ptr _impl;
+public:
+ flat_mutation_reader(std::unique_ptr impl) noexcept : _impl(std::move(impl)) {}
+
+ future operator()() {
+ return _impl->operator()();
+ }
+
+ template
+ GCC6_CONCEPT(
+ requires FlatMutationReaderConsumer()
+ )
+ auto consume_pausable(Consumer consumer) {
+ return _impl->consume_pausable(std::move(consumer));
+ }
+
+ void next_partition() { _impl->next_partition(); }
+
+ future<> fill_buffer() { return _impl->fill_buffer(); }
+
+ // Changes the range of partitions to pr. The range can only be moved
+ // forwards. pr.begin() needs to be larger than pr.end() of the previousl
+ // used range (i.e. either the initial one passed to the constructor or a
+ // previous fast forward target).
+ // pr needs to be valid until the reader is destroyed or fast_forward_to()
+ // is called again.
+ future<> fast_forward_to(const dht::partition_range& pr) {
+ return _impl->fast_forward_to(pr);
+ }
+ // Skips to a later range of rows.
+ // The new range must not overlap with the current range.
+ //
+ // In forwarding mode the stream does not return all fragments right away,
+ // but only those belonging to the current clustering range. Initially
+ // current range only covers the static row. The stream can be forwarded
+ // (even before end-of- stream) to a later range with fast_forward_to().
+ // Forwarding doesn't change initial restrictions of the stream, it can
+ // only be used to skip over data.
+ //
+ // Monotonicity of positions is preserved by forwarding. That is fragments
+ // emitted after forwarding will have greater positions than any fragments
+ // emitted before forwarding.
+ //
+ // For any range, all range tombstones relevant for that range which are
+ // present in the original stream will be emitted. Range tombstones
+ // emitted before forwarding which overlap with the new range are not
+ // necessarily re-emitted.
+ //
+ // When forwarding mode is not enabled, fast_forward_to()
+ // cannot be used.
+ future<> fast_forward_to(position_range cr) {
+ return _impl->fast_forward_to(std::move(cr));
+ }
+ bool is_end_of_stream() const { return _impl->is_end_of_stream(); }
+ bool is_buffer_empty() const { return _impl->is_buffer_empty(); }
+ bool is_buffer_full() const { return _impl->is_buffer_full(); }
+};
+
+template
+flat_mutation_reader make_flat_mutation_reader(Args &&... args) {
+ return flat_mutation_reader(std::make_unique(std::forward(args)...));
+}