db: add sstables to the range scan read path

This commit is contained in:
Avi Kivity
2015-05-26 15:42:58 +03:00
parent 062eaab809
commit a71c287c10

View File

@@ -176,6 +176,14 @@ make_memtable_reader(const memtable& mt) {
};
}
// Convert an sstable to a mutation_reader
mutation_reader
make_sstable_reader(sstables::sstable& sst, schema_ptr schema) {
return [reader = make_lw_shared(sst.read_range_rows(std::move(schema), dht::minimum_token(), dht::maximum_token()))] () mutable {
return reader->read();
};
}
template <typename Func>
future<bool>
column_family::for_all_partitions(Func&& func) const {
@@ -201,11 +209,12 @@ column_family::for_all_partitions(Func&& func) const {
// mutation being merged from ptables
std::experimental::optional<mutation> current;
lw_shared_ptr<memtable_list> memtables;
lw_shared_ptr<sstable_list> sstables;
Func func;
bool ok = true;
bool done() const { return !ok || ptables.empty(); }
iteration_state(const column_family& cf, Func&& func)
: memtables(cf._memtables), func(std::move(func)) {
: memtables(cf._memtables), sstables(cf._sstables), func(std::move(func)) {
}
};
iteration_state is(*this, std::move(func));
@@ -216,6 +225,9 @@ column_family::for_all_partitions(Func&& func) const {
is.tables.emplace_back(make_memtable_reader(*mtp));
}
}
for (auto sstp : *is.sstables | boost::adaptors::map_values) {
is.tables.emplace_back(make_sstable_reader(*sstp, _schema));
}
// Get first element from mutation_cursor, if any, and set up ptables
return parallel_for_each(is.tables, [this, &is] (mutation_reader& mr) {
return mr().then([this, &is, &mr] (mutation_opt&& m) {