diff --git a/database.cc b/database.cc index 6b85a9627a..2cae35b18a 100644 --- a/database.cc +++ b/database.cc @@ -24,6 +24,7 @@ #include "locator/simple_snitch.hh" #include #include +#include #include "frozen_mutation.hh" #include "mutation_partition_applier.hh" #include "core/do_with.hh" @@ -155,67 +156,112 @@ struct column_family::merge_comparator { } }; + +using mutation_reader = std::function ()>; + +// Convert a memtable to a subscription, which is what's expected by +// mutation_cursor (and provided by sstables). +mutation_reader +make_memtable_reader(const memtable& mt) { + auto begin = mt.all_partitions().begin(); + auto end = mt.all_partitions().end(); + return [begin, end, s = mt.schema()] () mutable { + if (begin != end) { + auto m = mutation(s, begin->first, begin->second); + ++begin; + return make_ready_future(std::experimental::make_optional(std::move(m))); + } else { + return make_ready_future(); + } + }; +} + template future column_family::for_all_partitions(Func&& func) const { static_assert(std::is_same>::value, "bad Func signature"); - using partitions_range = boost::iterator_range; + // The plan here is to use a heap structure to sort incoming + // mutations from many mutation_queues, grab them in turn, and + // either merge them (if the keys are the same), or pass them + // to func (if not). struct iteration_state { - std::vector tables; - std::vector ptables; - nway_merger, merge_comparator> merger; - std::experimental::optional> current; + std::vector tables; + struct mutation_and_reader { + mutation m; + mutation_reader* read; + }; + std::vector ptables; + // comparison function for std::make_heap()/std::push_heap() + static bool heap_compare(const mutation_and_reader& a, const mutation_and_reader& b) { + auto&& s = a.m.schema(); + // order of comparison is inverted, because heaps produce greatest value first + return b.m.decorated_key().less_compare(*s, a.m.decorated_key()); + } + // mutation being merged from ptables + std::experimental::optional current; lw_shared_ptr memtables; Func func; bool ok = true; - bool more = true; - bool done() const { return !(ok && more); } + bool done() const { return !ok || ptables.empty(); } iteration_state(const column_family& cf, Func&& func) - : merger{merge_comparator(cf.schema())}, memtables(cf._memtables), func(std::move(func)) { + : memtables(cf._memtables), func(std::move(func)) { } }; iteration_state is(*this, std::move(func)); - auto& tables = is.tables; - auto& ptables = is.ptables; - auto& merger = is.merger; - for (auto&& mtp : *is.memtables) { - tables.push_back(boost::make_iterator_range(mtp->all_partitions())); - } - for (auto&& r : tables) { - ptables.push_back(&r); - } - merger.create_heap(ptables); // Can't use memtable::partitions_type::value_type due do constness return do_with(std::move(is), [this] (iteration_state& is) { - return do_until(std::bind(&iteration_state::done, &is), [&is, this] { - auto& more = is.more; - auto& merger = is.merger; - more = merger.pop(boost::make_function_output_iterator([&] (const memtable::partitions_type::value_type& e) { - auto&& key = e.first; - auto&& mp = e.second; - auto& current = is.current; - auto& func = is.func; - auto& ok = is.ok; - // Schema cannot have different keys - if (current && !current->first.equal(*_schema, key)) { - ok = func(std::move(current->first), std::move(current->second)); - current = std::experimental::nullopt; + for (auto mtp : *is.memtables) { + if (!mtp->empty()) { + is.tables.emplace_back(make_memtable_reader(*mtp)); + } + } + // Get first element from mutation_cursor, if any, and set up ptables + return parallel_for_each(is.tables, [this, &is] (mutation_reader& mr) { + return mr().then([this, &is, &mr] (mutation_opt&& m) { + if (m) { + is.ptables.push_back({std::move(*m), &mr}); } - if (current) { + }); + }).then([&is, this] { + boost::range::make_heap(is.ptables, &iteration_state::heap_compare); + return do_until(std::bind(&iteration_state::done, &is), [&is, this] { + if (!is.ptables.empty()) { + boost::range::pop_heap(is.ptables, &iteration_state::heap_compare); + auto& candidate_queue = is.ptables.back(); + // Note: heap is now in invalid state, waiting for pop_back or push_heap, + // see below. + mutation& m = candidate_queue.m; // FIXME: handle different schemas - current->second.apply(*_schema, mp); + if (is.current && !is.current->decorated_key().equal(*m.schema(), m.decorated_key())) { + // key has changed, so emit accumukated mutation + is.ok = is.func(is.current->decorated_key(), is.current->partition()); + is.current = std::experimental::nullopt; + } + if (!is.current) { + is.current = std::move(m); + } else { + is.current->partition().apply(*m.schema(), m.partition()); + } + return (*candidate_queue.read)().then([&is] (mutation_opt&& more) { + // Restore heap to valid state + if (!more) { + is.ptables.pop_back(); + } else { + is.ptables.back().m = std::move(*more); + boost::range::push_heap(is.ptables, &iteration_state::heap_compare); + } + }); } else { - current = std::make_pair(key, mp); + return make_ready_future<>(); } - })); - return make_ready_future<>(); + }); }).then([this, &is] { auto& ok = is.ok; auto& current = is.current; auto& func = is.func; if (ok && current) { - ok = func(std::move(current->first), std::move(current->second)); + ok = func(std::move(current->decorated_key()), std::move(current->partition())); current = std::experimental::nullopt; } return make_ready_future(ok);