diff --git a/database.cc b/database.cc index e4d21f7b6c..c7aff6765e 100644 --- a/database.cc +++ b/database.cc @@ -176,6 +176,14 @@ make_memtable_reader(const memtable& mt) { }; } +// Convert an sstable to a mutation_reader +mutation_reader +make_sstable_reader(sstables::sstable& sst, schema_ptr schema) { + return [reader = make_lw_shared(sst.read_range_rows(std::move(schema), dht::minimum_token(), dht::maximum_token()))] () mutable { + return reader->read(); + }; +} + template future column_family::for_all_partitions(Func&& func) const { @@ -201,11 +209,12 @@ column_family::for_all_partitions(Func&& func) const { // mutation being merged from ptables std::experimental::optional current; lw_shared_ptr memtables; + lw_shared_ptr sstables; Func func; bool ok = true; bool done() const { return !ok || ptables.empty(); } iteration_state(const column_family& cf, Func&& func) - : memtables(cf._memtables), func(std::move(func)) { + : memtables(cf._memtables), sstables(cf._sstables), func(std::move(func)) { } }; iteration_state is(*this, std::move(func)); @@ -216,6 +225,9 @@ column_family::for_all_partitions(Func&& func) const { is.tables.emplace_back(make_memtable_reader(*mtp)); } } + for (auto sstp : *is.sstables | boost::adaptors::map_values) { + is.tables.emplace_back(make_sstable_reader(*sstp, _schema)); + } // Get first element from mutation_cursor, if any, and set up ptables return parallel_for_each(is.tables, [this, &is] (mutation_reader& mr) { return mr().then([this, &is, &mr] (mutation_opt&& m) {