diff --git a/mutation_reader.cc b/mutation_reader.cc index be04ec5a9e..187853e089 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -35,82 +35,64 @@ T move_and_clear(T& obj) { return x; } -// Combines multiple mutation_readers into one. -class combined_reader final : public mutation_reader::impl { - std::vector _readers; - struct mutation_and_reader { - streamed_mutation m; - mutation_reader* read; - }; - std::vector _ptables; - // comparison function for std::make_heap()/std::push_heap() - static bool heap_compare(const mutation_and_reader& a, const mutation_and_reader& b) { - auto&& s = a.m.schema(); - // order of comparison is inverted, because heaps produce greatest value first - return b.m.decorated_key().less_compare(*s, a.m.decorated_key()); - } - std::vector _current; - std::vector _next; -private: - future<> prepare_next() { - return parallel_for_each(_next, [this] (mutation_reader* mr) { - return (*mr)().then([this, mr] (streamed_mutation_opt next) { - if (next) { - _ptables.emplace_back(mutation_and_reader { std::move(*next), mr }); - boost::range::push_heap(_ptables, &heap_compare); - } - }); - }).then([this] { - _next.clear(); - }); - } - // Produces next mutation or disengaged optional if there are no more. - future next() { - if (_current.empty() && !_next.empty()) { - return prepare_next().then([this] { return next(); }); - } - if (_ptables.empty()) { - return make_ready_future(); - }; - - while (!_ptables.empty()) { - boost::range::pop_heap(_ptables, &heap_compare); - auto& candidate = _ptables.back(); - streamed_mutation& m = candidate.m; - - if (!_current.empty() && !_current.back().decorated_key().equal(*m.schema(), m.decorated_key())) { - // key has changed, so emit accumulated mutation +future<> combined_mutation_reader::prepare_next() { + return parallel_for_each(_next, [this] (mutation_reader* mr) { + return (*mr)().then([this, mr] (streamed_mutation_opt next) { + if (next) { + _ptables.emplace_back(mutation_and_reader { std::move(*next), mr }); boost::range::push_heap(_ptables, &heap_compare); - return make_ready_future(merge_mutations(move_and_clear(_current))); } + }); + }).then([this] { + _next.clear(); + }); +} - _current.emplace_back(std::move(m)); - _next.emplace_back(candidate.read); - _ptables.pop_back(); +future combined_mutation_reader::next() { + if (_current.empty() && !_next.empty()) { + return prepare_next().then([this] { return next(); }); + } + if (_ptables.empty()) { + return make_ready_future(); + }; + + while (!_ptables.empty()) { + boost::range::pop_heap(_ptables, &heap_compare); + auto& candidate = _ptables.back(); + streamed_mutation& m = candidate.m; + + if (!_current.empty() && !_current.back().decorated_key().equal(*m.schema(), m.decorated_key())) { + // key has changed, so emit accumulated mutation + boost::range::push_heap(_ptables, &heap_compare); + return make_ready_future(merge_mutations(move_and_clear(_current))); } - return make_ready_future(merge_mutations(move_and_clear(_current))); - } -public: - combined_reader(std::vector readers) - : _readers(std::move(readers)) - { - _next.reserve(_readers.size()); - _current.reserve(_readers.size()); - _ptables.reserve(_readers.size()); - for (auto&& r : _readers) { - _next.emplace_back(&r); - } + _current.emplace_back(std::move(m)); + _next.emplace_back(candidate.read); + _ptables.pop_back(); } + return make_ready_future(merge_mutations(move_and_clear(_current))); +} - virtual future operator()() override { - return next(); +combined_mutation_reader::combined_mutation_reader(std::vector readers) + : _readers(std::move(readers)) +{ + _next.reserve(_readers.size()); + _current.reserve(_readers.size()); + _ptables.reserve(_readers.size()); + + for (auto&& r : _readers) { + _next.emplace_back(&r); } -}; +} + +future combined_mutation_reader::operator()() { + return next(); +} mutation_reader make_combined_reader(std::vector readers) { - return make_mutation_reader(std::move(readers)); + return make_mutation_reader(std::move(readers)); } mutation_reader diff --git a/mutation_reader.hh b/mutation_reader.hh index 5e7fed3875..66b1619782 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -91,6 +91,31 @@ make_mutation_reader(Args&&... args) { return mutation_reader(std::make_unique(std::forward(args)...)); } +// Combines multiple mutation_readers into one. +class combined_mutation_reader final : public mutation_reader::impl { + std::vector _readers; + struct mutation_and_reader { + streamed_mutation m; + mutation_reader* read; + }; + std::vector _ptables; + // comparison function for std::make_heap()/std::push_heap() + static bool heap_compare(const mutation_and_reader& a, const mutation_and_reader& b) { + auto&& s = a.m.schema(); + // order of comparison is inverted, because heaps produce greatest value first + return b.m.decorated_key().less_compare(*s, a.m.decorated_key()); + } + std::vector _current; + std::vector _next; +private: + future<> prepare_next(); + // Produces next mutation or disengaged optional if there are no more. + future next(); +public: + combined_mutation_reader(std::vector readers); + virtual future operator()() override; +}; + // Creates a mutation reader which combines data return by supplied readers. // Returns mutation of the same schema only when all readers return mutations // of the same schema.