db: wire up sstable read path for single partition lookups

Untested, as we never add sstables to the read set yet.
This commit is contained in:
Avi Kivity
2015-05-20 17:53:35 +03:00
committed by Tomasz Grabiec
parent 1c7ed0a86c
commit 32c274f69e

View File

@@ -20,6 +20,7 @@
#include <boost/algorithm/string/split.hpp>
#include "sstables/sstables.hh"
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/map.hpp>
#include "locator/simple_snitch.hh"
#include <boost/algorithm/cxx11/all_of.hpp>
#include <boost/function_output_iterator.hpp>
@@ -60,8 +61,20 @@ memtable::find_partition(const dht::decorated_key& key) const {
future<column_family::const_mutation_partition_ptr>
column_family::find_partition(const dht::decorated_key& key) const {
// FIXME: optimize for 0 or 1 entries found case
mutation_partition ret(_schema);
bool any = false;
struct find_state {
sstables::key key;
mutation_partition ret;
bool any = false;
lw_shared_ptr<sstable_list> sstables; // protect from concurrent sstable removal
find_state(const column_family& cf, const dht::decorated_key& key)
: key(sstables::key::from_partition_key(*cf._schema, key._key))
, ret(cf._schema)
, sstables(cf._sstables) {
}
};
find_state fs(*this, key);
auto& ret = fs.ret;
bool& any = fs.any;
for (auto&& mtp : *_memtables) {
auto mp = mtp->find_partition(key);
if (mp) {
@@ -69,11 +82,22 @@ column_family::find_partition(const dht::decorated_key& key) const {
any = true;
}
}
if (any) {
return make_ready_future<const_mutation_partition_ptr>(std::make_unique<mutation_partition>(std::move(ret)));
} else {
return make_ready_future<const_mutation_partition_ptr>();
}
return do_with(std::move(fs), [this] (find_state& fs) {
return parallel_for_each(*fs.sstables | boost::adaptors::map_values, [this, &fs] (lw_shared_ptr<sstables::sstable> sstable) {
return sstable->read_row(_schema, fs.key).then([&fs] (mutation_opt mo) {
if (mo) {
fs.ret.apply(*mo->schema(), mo->partition());
fs.any = true;
}
});
}).then([&fs] {
if (fs.any) {
return make_ready_future<const_mutation_partition_ptr>(std::make_unique<mutation_partition>(std::move(fs.ret)));
} else {
return make_ready_future<const_mutation_partition_ptr>();
}
});
});
}
future<column_family::const_mutation_partition_ptr>