From a1fc5888d3df15ca283218bbd1cec8b7b4fa4d36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 24 May 2016 02:35:59 +0100 Subject: [PATCH] streamed_mutation: add mutation_merger MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- streamed_mutation.cc | 163 +++++++++++++++++++++++++++++++++++++++++++ streamed_mutation.hh | 5 +- 2 files changed, 167 insertions(+), 1 deletion(-) diff --git a/streamed_mutation.cc b/streamed_mutation.cc index 279da58f70..721b533ac9 100644 --- a/streamed_mutation.cc +++ b/streamed_mutation.cc @@ -23,6 +23,7 @@ #include "mutation.hh" #include "streamed_mutation.hh" +#include "utils/move.hh" mutation_fragment::mutation_fragment(static_row&& r) : _kind(kind::static_row), _data(std::make_unique()) @@ -214,3 +215,165 @@ streamed_mutation streamed_mutation_from_mutation(mutation m) return make_streamed_mutation(std::move(m)); } + +class mutation_merger final : public streamed_mutation::impl { + std::vector _original_readers; + struct streamed_reader { + tombstone current_tombstone; + streamed_mutation* reader; + }; + std::vector _next_readers; + // FIXME: do not store all in-flight clustering rows in memory + struct row_and_reader { + mutation_fragment row; + streamed_reader reader; + }; + std::vector _readers; + tombstone _current_tombstone; +private: + static void update_current_tombstone(streamed_reader& sr, mutation_fragment& mf) { + if (mf.is_range_tombstone_begin()) { + assert(!sr.current_tombstone); + sr.current_tombstone = mf.as_range_tombstone_begin().tomb(); + } else if (mf.is_range_tombstone_end()) { + assert(sr.current_tombstone); + sr.current_tombstone = { }; + } + } + + void read_next() { + if (_readers.empty()) { + _end_of_stream = true; + return; + } + + position_in_partition::less_compare cmp(*_schema); + auto heap_compare = [&] (auto& a, auto& b) { return cmp(b.row, a.row); }; + + boost::range::pop_heap(_readers, heap_compare); + auto result = std::move(_readers.back().row); + update_current_tombstone(_readers.back().reader, result); + _next_readers.emplace_back(std::move(_readers.back().reader)); + _readers.pop_back(); + + while (!_readers.empty()) { + if (cmp(result, _readers.front().row)) { + break; + } + boost::range::pop_heap(_readers, heap_compare); + update_current_tombstone(_readers.back().reader, _readers.back().row); + result.apply(*_schema, std::move(_readers.back().row)); + _next_readers.emplace_back(std::move(_readers.back().reader)); + _readers.pop_back(); + } + + bool can_emit_result = true; + if (result.is_range_tombstone_begin()) { + auto new_t = result.as_range_tombstone_begin().tomb(); + can_emit_result = _current_tombstone < new_t; + if (can_emit_result) { + if (_current_tombstone) { + auto& rtb = result.as_range_tombstone_begin(); + auto rte = range_tombstone_end(rtb.key(), invert_kind(rtb.kind())); + push_mutation_fragment(std::move(rte)); + } + push_mutation_fragment(std::move(result)); + _current_tombstone = new_t; + } + } else if (result.is_range_tombstone_end()) { + tombstone new_t; + for (auto& r_a_r : _readers) { + new_t = std::max(new_t, r_a_r.reader.current_tombstone); + } + for (auto& r : _next_readers) { + new_t = std::max(new_t, r.current_tombstone); + } + can_emit_result = new_t != _current_tombstone; + if (can_emit_result) { + if (new_t) { + auto& rte = result.as_range_tombstone_end(); + auto rtb = range_tombstone_begin(rte.key(), invert_kind(rte.kind()), new_t); + push_mutation_fragment(std::move(result)); + push_mutation_fragment(std::move(rtb)); + } else { + push_mutation_fragment(std::move(result)); + } + _current_tombstone = new_t; + } + } else { + push_mutation_fragment(std::move(result)); + } + } + + void do_fill_buffer() { + position_in_partition::less_compare cmp(*_schema); + auto heap_compare = [&] (auto& a, auto& b) { return cmp(b.row, a.row); }; + + for (auto& rd : _next_readers) { + if (rd.reader->is_buffer_empty()) { + assert(rd.reader->is_end_of_stream()); + continue; + } + _readers.emplace_back(row_and_reader { rd.reader->pop_mutation_fragment(), std::move(rd) }); + boost::range::push_heap(_readers, heap_compare); + } + _next_readers.clear(); + + read_next(); + } + void prefill_buffer() { + while (!is_end_of_stream() && !is_buffer_full()) { + for (auto& rd : _next_readers) { + if (rd.reader->is_buffer_empty() && !rd.reader->is_end_of_stream()) { + return; + } + } + do_fill_buffer(); + } + } + + static tombstone merge_partition_tombstones(const std::vector& readers) { + tombstone t; + for (auto& r : readers) { + t.apply(r.partition_tombstone()); + } + return t; + } +protected: + virtual future<> fill_buffer() override { + if (_next_readers.empty()) { + return make_ready_future<>(); + } + while (!is_end_of_stream() && !is_buffer_full()) { + std::vector> more_data; + for (auto& rd : _next_readers) { + if (rd.reader->is_buffer_empty() && !rd.reader->is_end_of_stream()) { + more_data.emplace_back(rd.reader->fill_buffer()); + } + } + if (!more_data.empty()) { + return parallel_for_each(std::move(more_data), [] (auto& f) { return std::move(f); }).then([this] { return fill_buffer(); }); + } + do_fill_buffer(); + } + return make_ready_future<>(); + } +public: + mutation_merger(schema_ptr s, dht::decorated_key dk, std::vector readers) + : streamed_mutation::impl(std::move(s), std::move(dk), merge_partition_tombstones(readers)) + , _original_readers(std::move(readers)) + { + _next_readers.reserve(_original_readers.size()); + _readers.reserve(_original_readers.size()); + for (auto& rd : _original_readers) { + _next_readers.emplace_back(streamed_reader { { }, &rd }); + } + prefill_buffer(); + } +}; + +streamed_mutation merge_mutations(std::vector ms) +{ + assert(!ms.empty()); + return make_streamed_mutation(ms.back().schema(), ms.back().decorated_key(), std::move(ms)); +} diff --git a/streamed_mutation.hh b/streamed_mutation.hh index 876e42ff7f..66612eea8a 100644 --- a/streamed_mutation.hh +++ b/streamed_mutation.hh @@ -535,4 +535,7 @@ auto consume(streamed_mutation& m, Consumer consumer) { class mutation; -streamed_mutation streamed_mutation_from_mutation(mutation); \ No newline at end of file +streamed_mutation streamed_mutation_from_mutation(mutation); + +//Requires all streamed_mutations to have the same schema. +streamed_mutation merge_mutations(std::vector);