/* * Copyright (C) 2015 ScyllaDB */ /* * This file is part of Scylla. * * Scylla is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Scylla is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Scylla. If not, see . */ #include #include #include "mutation_reader.hh" #include "core/future-util.hh" #include "utils/move.hh" namespace stdx = std::experimental; // Combines multiple mutation_readers into one. class combined_reader final : public mutation_reader::impl { std::vector _readers; struct mutation_and_reader { mutation m; mutation_reader* read; }; std::vector _ptables; // comparison function for std::make_heap()/std::push_heap() static bool heap_compare(const mutation_and_reader& a, const mutation_and_reader& b) { auto&& s = a.m.schema(); // order of comparison is inverted, because heaps produce greatest value first return b.m.decorated_key().less_compare(*s, a.m.decorated_key()); } mutation_opt _current; bool _inited = false; private: // Produces next mutation or disengaged optional if there are no more. // // Entry conditions: // - either _ptables is empty or_ptables.back() is the next item to be consumed. // - the _ptables heap is in invalid state (if not empty), waiting for pop_back or push_heap. future next() { if (_ptables.empty()) { return make_ready_future(move_and_disengage(_current)); }; auto& candidate = _ptables.back(); mutation& m = candidate.m; if (_current && !_current->decorated_key().equal(*m.schema(), m.decorated_key())) { // key has changed, so emit accumulated mutation return make_ready_future(move_and_disengage(_current)); } apply(_current, std::move(m)); return (*candidate.read)().then([this] (mutation_opt&& more) { // Restore heap to valid state if (!more) { _ptables.pop_back(); } else { _ptables.back().m = std::move(*more); boost::range::push_heap(_ptables, &heap_compare); } boost::range::pop_heap(_ptables, &heap_compare); return next(); }); } public: combined_reader(std::vector readers) : _readers(std::move(readers)) { } virtual future operator()() override { if (!_inited) { return parallel_for_each(_readers, [this] (mutation_reader& reader) { return reader().then([this, &reader](mutation_opt&& m) { if (m) { _ptables.push_back({std::move(*m), &reader}); } }); }).then([this] { boost::range::make_heap(_ptables, &heap_compare); boost::range::pop_heap(_ptables, &heap_compare); _inited = true; return next(); }); } return next(); } }; mutation_reader make_combined_reader(std::vector readers) { return make_mutation_reader(std::move(readers)); } class joining_reader final : public mutation_reader::impl { std::vector _readers; std::vector::iterator _current; public: joining_reader(std::vector readers) : _readers(std::move(readers)) , _current(_readers.begin()) { } joining_reader(joining_reader&&) = default; virtual future operator()() override { if (_current == _readers.end()) { return make_ready_future(stdx::nullopt); } return (*_current)().then([this] (mutation_opt m) { if (!m) { ++_current; return operator()(); } else { return make_ready_future(std::move(m)); } }); } }; mutation_reader make_combined_reader(mutation_reader&& a, mutation_reader&& b) { std::vector v; v.reserve(2); v.push_back(std::move(a)); v.push_back(std::move(b)); return make_combined_reader(std::move(v)); } mutation_reader make_joining_reader(std::vector readers) { return make_mutation_reader(std::move(readers)); } class lazy_reader final : public mutation_reader::impl { std::function _make_reader; stdx::optional _reader; public: lazy_reader(std::function make_reader) : _make_reader(std::move(make_reader)) { } virtual future operator()() override { if (!_reader) { _reader = _make_reader(); } return (*_reader)(); } }; mutation_reader make_lazy_reader(std::function make_reader) { return make_mutation_reader(std::move(make_reader)); } class reader_returning final : public mutation_reader::impl { mutation _m; bool _done = false; public: reader_returning(mutation m) : _m(std::move(m)) { } virtual future operator()() override { if (_done) { return make_ready_future(); } else { _done = true; return make_ready_future(std::move(_m)); } } }; mutation_reader make_reader_returning(mutation m) { return make_mutation_reader(std::move(m)); } class reader_returning_many final : public mutation_reader::impl { std::vector _m; bool _done = false; public: reader_returning_many(std::vector m) : _m(std::move(m)) { boost::range::reverse(_m); } virtual future operator()() override { if (_m.empty()) { return make_ready_future(); } auto m = std::move(_m.back()); _m.pop_back(); return make_ready_future(std::move(m)); } }; mutation_reader make_reader_returning_many(std::vector mutations) { return make_mutation_reader(std::move(mutations)); } class empty_reader final : public mutation_reader::impl { public: virtual future operator()() override { return make_ready_future(); } }; mutation_reader make_empty_reader() { return make_mutation_reader(); }