db: futurize column_family::for_all_partitions() internal loop

Adapt for_all_partitions() to use futures instead of iterators,
as that will be the interface to sstables.  We drop use of nway_merger as
that is not able to use futures and instead open-code the heap
functionality.
This commit is contained in:
Avi Kivity
2015-05-26 13:21:09 +03:00
parent 33ac1922f3
commit 61d62dffe8

View File

@@ -24,6 +24,7 @@
#include "locator/simple_snitch.hh"
#include <boost/algorithm/cxx11/all_of.hpp>
#include <boost/function_output_iterator.hpp>
#include <boost/range/algorithm/heap_algorithm.hpp>
#include "frozen_mutation.hh"
#include "mutation_partition_applier.hh"
#include "core/do_with.hh"
@@ -155,67 +156,112 @@ struct column_family::merge_comparator {
}
};
using mutation_reader = std::function<future<mutation_opt> ()>;
// Convert a memtable to a subscription<mutation>, which is what's expected by
// mutation_cursor (and provided by sstables).
mutation_reader
make_memtable_reader(const memtable& mt) {
auto begin = mt.all_partitions().begin();
auto end = mt.all_partitions().end();
return [begin, end, s = mt.schema()] () mutable {
if (begin != end) {
auto m = mutation(s, begin->first, begin->second);
++begin;
return make_ready_future<mutation_opt>(std::experimental::make_optional(std::move(m)));
} else {
return make_ready_future<mutation_opt>();
}
};
}
template <typename Func>
future<bool>
column_family::for_all_partitions(Func&& func) const {
static_assert(std::is_same<bool, std::result_of_t<Func(const dht::decorated_key&, const mutation_partition&)>>::value,
"bad Func signature");
using partitions_range = boost::iterator_range<memtable::partitions_type::const_iterator>;
// The plan here is to use a heap structure to sort incoming
// mutations from many mutation_queues, grab them in turn, and
// either merge them (if the keys are the same), or pass them
// to func (if not).
struct iteration_state {
std::vector<partitions_range> tables;
std::vector<partitions_range*> ptables;
nway_merger<std::vector<partitions_range*>, merge_comparator> merger;
std::experimental::optional<std::pair<dht::decorated_key, mutation_partition>> current;
std::vector<mutation_reader> tables;
struct mutation_and_reader {
mutation m;
mutation_reader* read;
};
std::vector<mutation_and_reader> ptables;
// comparison function for std::make_heap()/std::push_heap()
static bool heap_compare(const mutation_and_reader& a, const mutation_and_reader& b) {
auto&& s = a.m.schema();
// order of comparison is inverted, because heaps produce greatest value first
return b.m.decorated_key().less_compare(*s, a.m.decorated_key());
}
// mutation being merged from ptables
std::experimental::optional<mutation> current;
lw_shared_ptr<memtable_list> memtables;
Func func;
bool ok = true;
bool more = true;
bool done() const { return !(ok && more); }
bool done() const { return !ok || ptables.empty(); }
iteration_state(const column_family& cf, Func&& func)
: merger{merge_comparator(cf.schema())}, memtables(cf._memtables), func(std::move(func)) {
: memtables(cf._memtables), func(std::move(func)) {
}
};
iteration_state is(*this, std::move(func));
auto& tables = is.tables;
auto& ptables = is.ptables;
auto& merger = is.merger;
for (auto&& mtp : *is.memtables) {
tables.push_back(boost::make_iterator_range(mtp->all_partitions()));
}
for (auto&& r : tables) {
ptables.push_back(&r);
}
merger.create_heap(ptables);
// Can't use memtable::partitions_type::value_type due do constness
return do_with(std::move(is), [this] (iteration_state& is) {
return do_until(std::bind(&iteration_state::done, &is), [&is, this] {
auto& more = is.more;
auto& merger = is.merger;
more = merger.pop(boost::make_function_output_iterator([&] (const memtable::partitions_type::value_type& e) {
auto&& key = e.first;
auto&& mp = e.second;
auto& current = is.current;
auto& func = is.func;
auto& ok = is.ok;
// Schema cannot have different keys
if (current && !current->first.equal(*_schema, key)) {
ok = func(std::move(current->first), std::move(current->second));
current = std::experimental::nullopt;
for (auto mtp : *is.memtables) {
if (!mtp->empty()) {
is.tables.emplace_back(make_memtable_reader(*mtp));
}
}
// Get first element from mutation_cursor, if any, and set up ptables
return parallel_for_each(is.tables, [this, &is] (mutation_reader& mr) {
return mr().then([this, &is, &mr] (mutation_opt&& m) {
if (m) {
is.ptables.push_back({std::move(*m), &mr});
}
if (current) {
});
}).then([&is, this] {
boost::range::make_heap(is.ptables, &iteration_state::heap_compare);
return do_until(std::bind(&iteration_state::done, &is), [&is, this] {
if (!is.ptables.empty()) {
boost::range::pop_heap(is.ptables, &iteration_state::heap_compare);
auto& candidate_queue = is.ptables.back();
// Note: heap is now in invalid state, waiting for pop_back or push_heap,
// see below.
mutation& m = candidate_queue.m;
// FIXME: handle different schemas
current->second.apply(*_schema, mp);
if (is.current && !is.current->decorated_key().equal(*m.schema(), m.decorated_key())) {
// key has changed, so emit accumukated mutation
is.ok = is.func(is.current->decorated_key(), is.current->partition());
is.current = std::experimental::nullopt;
}
if (!is.current) {
is.current = std::move(m);
} else {
is.current->partition().apply(*m.schema(), m.partition());
}
return (*candidate_queue.read)().then([&is] (mutation_opt&& more) {
// Restore heap to valid state
if (!more) {
is.ptables.pop_back();
} else {
is.ptables.back().m = std::move(*more);
boost::range::push_heap(is.ptables, &iteration_state::heap_compare);
}
});
} else {
current = std::make_pair(key, mp);
return make_ready_future<>();
}
}));
return make_ready_future<>();
});
}).then([this, &is] {
auto& ok = is.ok;
auto& current = is.current;
auto& func = is.func;
if (ok && current) {
ok = func(std::move(current->first), std::move(current->second));
ok = func(std::move(current->decorated_key()), std::move(current->partition()));
current = std::experimental::nullopt;
}
return make_ready_future<bool>(ok);