db: futurize the single partition query path

Prepare for disk reads.
This commit is contained in:
Avi Kivity
2015-05-18 19:45:39 +03:00
parent 738be63b28
commit db04bba208
5 changed files with 95 additions and 66 deletions

View File

@@ -54,7 +54,7 @@ memtable::find_partition(const dht::decorated_key& key) const {
return i == partitions.end() ? const_mutation_partition_ptr() : std::make_unique<const mutation_partition>(i->second);
}
column_family::const_mutation_partition_ptr
future<column_family::const_mutation_partition_ptr>
column_family::find_partition(const dht::decorated_key& key) const {
// FIXME: optimize for 0 or 1 entries found case
mutation_partition ret(_schema);
@@ -67,30 +67,31 @@ column_family::find_partition(const dht::decorated_key& key) const {
}
}
if (any) {
return std::make_unique<mutation_partition>(std::move(ret));
return make_ready_future<const_mutation_partition_ptr>(std::make_unique<mutation_partition>(std::move(ret)));
} else {
return nullptr;
return make_ready_future<const_mutation_partition_ptr>();
}
}
column_family::const_mutation_partition_ptr
future<column_family::const_mutation_partition_ptr>
column_family::find_partition_slow(const partition_key& key) const {
return find_partition(dht::global_partitioner().decorate_key(*_schema, key));
}
column_family::const_row_ptr
column_family::find_row(const dht::decorated_key& partition_key, const clustering_key& clustering_key) const {
const_mutation_partition_ptr p = find_partition(partition_key);
if (!p) {
return nullptr;
}
auto r = p->find_row(clustering_key);
if (r) {
// FIXME: remove copy if only one data source
return std::make_unique<row>(*r);
} else {
return nullptr;
}
future<column_family::const_row_ptr>
column_family::find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const {
return find_partition(partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) {
if (!p) {
return make_ready_future<const_row_ptr>();
}
auto r = p->find_row(clustering_key);
if (r) {
// FIXME: remove copy if only one data source
return make_ready_future<const_row_ptr>(std::make_unique<row>(*r));
} else {
return make_ready_future<const_row_ptr>();
}
});
}
mutation_partition&
@@ -128,29 +129,46 @@ struct column_family::merge_comparator {
};
template <typename Func>
bool
future<bool>
column_family::for_all_partitions(Func&& func) const {
static_assert(std::is_same<bool, std::result_of_t<Func(const dht::decorated_key&, const mutation_partition&)>>::value,
"bad Func signature");
using partitions_range = boost::iterator_range<memtable::partitions_type::const_iterator>;
std::vector<partitions_range> tables;
struct iteration_state {
std::vector<partitions_range> tables;
std::vector<partitions_range*> ptables;
nway_merger<std::vector<partitions_range*>, merge_comparator> merger;
std::experimental::optional<std::pair<dht::decorated_key, mutation_partition>> current;
Func func;
bool ok = true;
bool more = true;
bool done() const { return !(ok && more); }
iteration_state(const column_family& cf, Func&& func)
: merger{merge_comparator(cf.schema())}, func(std::move(func)) {
}
};
iteration_state is(*this, std::move(func));
auto& tables = is.tables;
auto& ptables = is.ptables;
auto& merger = is.merger;
for (auto&& mt : _memtables) {
tables.push_back(boost::make_iterator_range(mt.all_partitions()));
}
std::vector<partitions_range*> ptables;
for (auto&& r : tables) {
ptables.push_back(&r);
}
nway_merger<std::vector<partitions_range*>, merge_comparator> merger{merge_comparator(_schema)};
merger.create_heap(ptables);
bool ok = true;
bool more = true;
// Can't use memtable::partitions_type::value_type due do constness
std::experimental::optional<std::pair<dht::decorated_key, mutation_partition>> current;
while (ok && more) {
return do_with(std::move(is), [this] (iteration_state& is) {
return do_until(std::bind(&iteration_state::done, &is), [&is, this] {
auto& more = is.more;
auto& merger = is.merger;
more = merger.pop(boost::make_function_output_iterator([&] (const memtable::partitions_type::value_type& e) {
auto&& key = e.first;
auto&& mp = e.second;
auto& current = is.current;
auto& func = is.func;
auto& ok = is.ok;
// Schema cannot have different keys
if (current && !current->first.equal(*_schema, key)) {
ok = func(std::move(current->first), std::move(current->second));
@@ -163,15 +181,21 @@ column_family::for_all_partitions(Func&& func) const {
current = std::make_pair(key, mp);
}
}));
}
if (ok && current) {
ok = func(std::move(current->first), std::move(current->second));
current = std::experimental::nullopt;
}
return ok;
return make_ready_future<>();
}).then([this, &is] {
auto& ok = is.ok;
auto& current = is.current;
auto& func = is.func;
if (ok && current) {
ok = func(std::move(current->first), std::move(current->second));
current = std::experimental::nullopt;
}
return make_ready_future<bool>(ok);
});
});
}
bool
future<bool>
column_family::for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const {
return for_all_partitions(std::move(func));
}
@@ -676,17 +700,21 @@ column_family::query(const query::read_command& cmd) const {
auto& limit = qs.limit;
auto&& range = *qs.current_partition_range++;
if (range.is_singular()) {
auto& key = range.start_value();
auto partition = find_partition_slow(key);
auto& key = range.start_value();
return find_partition_slow(key).then([this, &qs, &key] (auto partition) {
auto& cmd = qs.cmd;
auto& builder = qs.builder;
auto& limit = qs.limit;
if (!partition) {
return make_ready_future<>();
return;
}
auto p_builder = builder.add_partition(key);
partition->query(*_schema, cmd.slice, limit, p_builder);
p_builder.finish();
limit -= p_builder.row_count();
});
} else if (range.is_full()) {
for_all_partitions([&] (const dht::decorated_key& dk, const mutation_partition& partition) {
return for_all_partitions([&] (const dht::decorated_key& dk, const mutation_partition& partition) {
auto p_builder = builder.add_partition(dk._key);
partition.query(*_schema, cmd.slice, limit, p_builder);
p_builder.finish();
@@ -695,7 +723,7 @@ column_family::query(const query::read_command& cmd) const {
return false;
}
return true;
});
}).discard_result();
} else {
fail(unimplemented::cause::RANGE_QUERIES);
}
@@ -733,13 +761,7 @@ std::ostream& operator<<(std::ostream& os, const mutation& m) {
}
std::ostream& operator<<(std::ostream& out, const column_family& cf) {
out << "{\n";
cf.for_all_partitions([&] (const dht::decorated_key& key, const mutation_partition& mp) {
out << key << " => " << mp << "\n";
return true;
});
out << "}";
return out;
return fprint(out, "{column_family: %s/%s}", cf._schema->ks_name(), cf._schema->cf_name());
}
std::ostream& operator<<(std::ostream& out, const database& db) {

View File

@@ -88,9 +88,9 @@ public:
column_family(column_family&&);
~column_family();
schema_ptr schema() const { return _schema; }
const_mutation_partition_ptr find_partition(const dht::decorated_key& key) const;
const_mutation_partition_ptr find_partition_slow(const partition_key& key) const;
const_row_ptr find_row(const dht::decorated_key& partition_key, const clustering_key& clustering_key) const;
future<const_mutation_partition_ptr> find_partition(const dht::decorated_key& key) const;
future<const_mutation_partition_ptr> find_partition_slow(const partition_key& key) const;
future<const_row_ptr> find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const;
void apply(const frozen_mutation& m);
void apply(const mutation& m);
// Returns at most "cmd.limit" rows
@@ -104,13 +104,13 @@ private:
// so that iteration can be stopped by returning false.
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)
template <typename Func>
bool for_all_partitions(Func&& func) const;
future<bool> for_all_partitions(Func&& func) const;
future<> probe_file(sstring sstdir, sstring fname);
void seal_on_overflow();
public:
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
bool for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
future<bool> for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
friend std::ostream& operator<<(std::ostream& out, const column_family& cf);
};

View File

@@ -129,9 +129,9 @@ public:
column_name = std::move(column_name),
expected = std::move(expected),
table_name = std::move(table_name)] (database& db) {
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf.schema();
auto p = cf.find_partition_slow(pkey);
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf.schema();
return cf.find_partition_slow(pkey).then([schema, ck, column_name, expected] (column_family::const_mutation_partition_ptr p) {
assert(p != nullptr);
auto row = p->find_row(clustering_key::from_deeply_exploded(*schema, ck));
assert(row != nullptr);
@@ -153,6 +153,7 @@ public:
serialization_format::internal());
}
assert(col_def->type->equal(actual, col_def->type->decompose(expected)));
});
});
}

View File

@@ -8,6 +8,7 @@
#include "core/sstring.hh"
#include "database.hh"
#include "utils/UUID_gen.hh"
#include "core/do_with.hh"
#include <random>
static sstring some_keyspace("ks");
@@ -228,14 +229,15 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
insert_row(1003, 2003);
auto verify_row = [&] (int32_t c1, int32_t r1) {
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
auto r = cf.find_row(dht::global_partitioner().decorate_key(*s, key), c_key);
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
return cf.find_row(dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) {
BOOST_REQUIRE(r);
auto i = r->find_cell(r1_col.id);
BOOST_REQUIRE(i);
auto cell = i->as_atomic_cell();
BOOST_REQUIRE(cell.is_live());
BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1)));
});
};
verify_row(1001, 2001);
verify_row(1002, 2002);
@@ -244,13 +246,13 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
}
SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
column_family::config cfg;
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
column_family cf(s, cfg);
column_family::config cfg;
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
return do_with(column_family(s, cfg), [s] (column_family& cf) {
std::map<int32_t, std::map<int32_t, int32_t>> shadow, result;
const column_definition& r1_col = *s->get_column_definition("r1");
@@ -275,7 +277,8 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
cf.seal_active_memtable();
}
cf.for_all_partitions_slow([&] (const dht::decorated_key& pk, const mutation_partition& mp) {
return do_with(std::move(result), [&cf, s, &r1_col, shadow] (auto& result) {
return cf.for_all_partitions_slow([&] (const dht::decorated_key& pk, const mutation_partition& mp) {
auto p1 = boost::any_cast<int32_t>(int32_type->deserialize(pk._key.explode(*s)[0]));
for (const rows_entry& re : mp.range(*s, query::range<clustering_key_prefix>())) {
auto c1 = boost::any_cast<int32_t>(int32_type->deserialize(re.key().explode(*s)[0]));
@@ -285,9 +288,11 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
}
}
return true;
}).then([&result, shadow] (bool ok) {
BOOST_REQUIRE(shadow == result);
});
});
BOOST_REQUIRE(shadow == result);
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(test_cell_ordering) {

View File

@@ -126,7 +126,6 @@ public:
dk = std::move(dk),
column_parent = std::move(column_parent),
predicate = std::move(predicate)] (database& db) {
std::vector<ColumnOrSuperColumn> ret;
if (!column_parent.super_column.empty()) {
throw unimplemented_exception();
}
@@ -134,8 +133,9 @@ public:
if (predicate.__isset.column_names) {
throw unimplemented_exception();
} else if (predicate.__isset.slice_range) {
auto&& range = predicate.slice_range;
column_family::const_row_ptr rw = cf.find_row(dk, clustering_key::make_empty(*cf.schema()));
auto&& range = predicate.slice_range;
return cf.find_row(dk, clustering_key::make_empty(*cf.schema())).then([&cf, range = std::move(range)] (column_family::const_row_ptr rw) {
std::vector<ColumnOrSuperColumn> ret;
if (rw) {
auto beg = cf.schema()->regular_begin();
if (!range.start.empty()) {
@@ -165,6 +165,7 @@ public:
}
}
return make_foreign(make_lw_shared(std::move(ret)));
});
} else {
throw make_exception<InvalidRequestException>("empty SlicePredicate");
}