Merge tag 'avi/readpath-prep/v1' from seastar-dev.git

From Avi:

"This patchset prepares for adding sstables to the read path.  Because sstables
involve I/O, their APIs return futures, which means that APIs that may call
those sstable APIs also need to return futures.

This patchset uses the two-space indent + do_with + reference aliases trick
to make patches more readable.  Cleanup patches will follow once it is merged."
This commit is contained in:
Tomasz Grabiec
2015-05-19 20:39:36 +02:00
8 changed files with 138 additions and 85 deletions

View File

@@ -25,6 +25,7 @@
#include <boost/function_output_iterator.hpp>
#include "frozen_mutation.hh"
#include "mutation_partition_applier.hh"
#include "core/do_with.hh"
thread_local logging::logger dblog("database");
@@ -39,6 +40,9 @@ column_family::column_family(schema_ptr schema, config config)
, _memtables({memtable(_schema)})
{ }
// define in .cc, since sstable is forward-declared in .hh
column_family::column_family(column_family&& x) = default;
// define in .cc, since sstable is forward-declared in .hh
column_family::~column_family() {
}
@@ -50,7 +54,7 @@ memtable::find_partition(const dht::decorated_key& key) const {
return i == partitions.end() ? const_mutation_partition_ptr() : std::make_unique<const mutation_partition>(i->second);
}
column_family::const_mutation_partition_ptr
future<column_family::const_mutation_partition_ptr>
column_family::find_partition(const dht::decorated_key& key) const {
// FIXME: optimize for 0 or 1 entries found case
mutation_partition ret(_schema);
@@ -63,30 +67,31 @@ column_family::find_partition(const dht::decorated_key& key) const {
}
}
if (any) {
return std::make_unique<mutation_partition>(std::move(ret));
return make_ready_future<const_mutation_partition_ptr>(std::make_unique<mutation_partition>(std::move(ret)));
} else {
return nullptr;
return make_ready_future<const_mutation_partition_ptr>();
}
}
column_family::const_mutation_partition_ptr
future<column_family::const_mutation_partition_ptr>
column_family::find_partition_slow(const partition_key& key) const {
return find_partition(dht::global_partitioner().decorate_key(*_schema, key));
}
column_family::const_row_ptr
column_family::find_row(const dht::decorated_key& partition_key, const clustering_key& clustering_key) const {
const_mutation_partition_ptr p = find_partition(partition_key);
if (!p) {
return nullptr;
}
auto r = p->find_row(clustering_key);
if (r) {
// FIXME: remove copy if only one data source
return std::make_unique<row>(*r);
} else {
return nullptr;
}
future<column_family::const_row_ptr>
column_family::find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const {
return find_partition(partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) {
if (!p) {
return make_ready_future<const_row_ptr>();
}
auto r = p->find_row(clustering_key);
if (r) {
// FIXME: remove copy if only one data source
return make_ready_future<const_row_ptr>(std::make_unique<row>(*r));
} else {
return make_ready_future<const_row_ptr>();
}
});
}
mutation_partition&
@@ -124,29 +129,46 @@ struct column_family::merge_comparator {
};
template <typename Func>
bool
future<bool>
column_family::for_all_partitions(Func&& func) const {
static_assert(std::is_same<bool, std::result_of_t<Func(const dht::decorated_key&, const mutation_partition&)>>::value,
"bad Func signature");
using partitions_range = boost::iterator_range<memtable::partitions_type::const_iterator>;
std::vector<partitions_range> tables;
struct iteration_state {
std::vector<partitions_range> tables;
std::vector<partitions_range*> ptables;
nway_merger<std::vector<partitions_range*>, merge_comparator> merger;
std::experimental::optional<std::pair<dht::decorated_key, mutation_partition>> current;
Func func;
bool ok = true;
bool more = true;
bool done() const { return !(ok && more); }
iteration_state(const column_family& cf, Func&& func)
: merger{merge_comparator(cf.schema())}, func(std::move(func)) {
}
};
iteration_state is(*this, std::move(func));
auto& tables = is.tables;
auto& ptables = is.ptables;
auto& merger = is.merger;
for (auto&& mt : _memtables) {
tables.push_back(boost::make_iterator_range(mt.all_partitions()));
}
std::vector<partitions_range*> ptables;
for (auto&& r : tables) {
ptables.push_back(&r);
}
nway_merger<std::vector<partitions_range*>, merge_comparator> merger{merge_comparator(_schema)};
merger.create_heap(ptables);
bool ok = true;
bool more = true;
// Can't use memtable::partitions_type::value_type due do constness
std::experimental::optional<std::pair<dht::decorated_key, mutation_partition>> current;
while (ok && more) {
return do_with(std::move(is), [this] (iteration_state& is) {
return do_until(std::bind(&iteration_state::done, &is), [&is, this] {
auto& more = is.more;
auto& merger = is.merger;
more = merger.pop(boost::make_function_output_iterator([&] (const memtable::partitions_type::value_type& e) {
auto&& key = e.first;
auto&& mp = e.second;
auto& current = is.current;
auto& func = is.func;
auto& ok = is.ok;
// Schema cannot have different keys
if (current && !current->first.equal(*_schema, key)) {
ok = func(std::move(current->first), std::move(current->second));
@@ -159,15 +181,21 @@ column_family::for_all_partitions(Func&& func) const {
current = std::make_pair(key, mp);
}
}));
}
if (ok && current) {
ok = func(std::move(current->first), std::move(current->second));
current = std::experimental::nullopt;
}
return ok;
return make_ready_future<>();
}).then([this, &is] {
auto& ok = is.ok;
auto& current = is.current;
auto& func = is.func;
if (ok && current) {
ok = func(std::move(current->first), std::move(current->second));
current = std::experimental::nullopt;
}
return make_ready_future<bool>(ok);
});
});
}
bool
future<bool>
column_family::for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const {
return for_all_partitions(std::move(func));
}
@@ -284,11 +312,13 @@ void
column_family::seal_active_memtable() {
auto& old = _memtables.back();
_memtables.emplace_back(_schema);
sstring name = sprint("%s/%s-%s-%d.%d-Data.db",
// FIXME: better way of ensuring we don't attemt to
// overwrite an existing table.
auto gen = _sstable_generation++ * smp::count + engine().cpu_id();
sstring name = sprint("%s/%s-%s-%d-Data.db",
_config.datadir,
_schema->ks_name(), _schema->cf_name(),
engine().cpu_id(),
_sstable_generation++);
gen);
if (!_config.enable_disk_writes) {
return;
}
@@ -649,27 +679,46 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
return 0;
}
struct query_state {
explicit query_state(const query::read_command& cmd)
: cmd(cmd)
, builder(cmd.slice)
, limit(cmd.row_limit)
, current_partition_range(cmd.partition_ranges.begin()) {
}
const query::read_command& cmd;
query::result::builder builder;
uint32_t limit;
std::vector<query::partition_range>::const_iterator current_partition_range;
bool done() const {
return !limit || current_partition_range == cmd.partition_ranges.end();
}
};
future<lw_shared_ptr<query::result>>
column_family::query(const query::read_command& cmd) const {
query::result::builder builder(cmd.slice);
uint32_t limit = cmd.row_limit;
for (auto&& range : cmd.partition_ranges) {
if (limit == 0) {
break;
}
return do_with(query_state(cmd), [this] (query_state& qs) {
return do_until(std::bind(&query_state::done, &qs), [this, &qs] {
auto& cmd = qs.cmd;
auto& builder = qs.builder;
auto& limit = qs.limit;
auto&& range = *qs.current_partition_range++;
if (range.is_singular()) {
auto& key = range.start_value();
auto partition = find_partition_slow(key);
auto& key = range.start_value();
return find_partition_slow(key).then([this, &qs, &key] (auto partition) {
auto& cmd = qs.cmd;
auto& builder = qs.builder;
auto& limit = qs.limit;
if (!partition) {
continue;
return;
}
auto p_builder = builder.add_partition(key);
partition->query(*_schema, cmd.slice, limit, p_builder);
p_builder.finish();
limit -= p_builder.row_count();
});
} else if (range.is_full()) {
for_all_partitions([&] (const dht::decorated_key& dk, const mutation_partition& partition) {
return for_all_partitions([&] (const dht::decorated_key& dk, const mutation_partition& partition) {
auto p_builder = builder.add_partition(dk._key);
partition.query(*_schema, cmd.slice, limit, p_builder);
p_builder.finish();
@@ -678,13 +727,16 @@ column_family::query(const query::read_command& cmd) const {
return false;
}
return true;
});
}).discard_result();
} else {
fail(unimplemented::cause::RANGE_QUERIES);
}
}
return make_ready_future<lw_shared_ptr<query::result>>(
make_lw_shared<query::result>(builder.build()));
return make_ready_future<>();
}).then([&qs] {
return make_ready_future<lw_shared_ptr<query::result>>(
make_lw_shared<query::result>(qs.builder.build()));
});
});
}
future<lw_shared_ptr<query::result>>
@@ -713,13 +765,7 @@ std::ostream& operator<<(std::ostream& os, const mutation& m) {
}
std::ostream& operator<<(std::ostream& out, const column_family& cf) {
out << "{\n";
cf.for_all_partitions([&] (const dht::decorated_key& key, const mutation_partition& mp) {
out << key << " => " << mp << "\n";
return true;
});
out << "}";
return out;
return fprint(out, "{column_family: %s/%s}", cf._schema->ks_name(), cf._schema->cf_name());
}
std::ostream& operator<<(std::ostream& out, const database& db) {

View File

@@ -84,12 +84,12 @@ public:
using const_row_ptr = std::unique_ptr<const row>;
public:
column_family(schema_ptr schema, config cfg);
column_family(column_family&&) = default;
column_family(column_family&&);
~column_family();
schema_ptr schema() const { return _schema; }
const_mutation_partition_ptr find_partition(const dht::decorated_key& key) const;
const_mutation_partition_ptr find_partition_slow(const partition_key& key) const;
const_row_ptr find_row(const dht::decorated_key& partition_key, const clustering_key& clustering_key) const;
future<const_mutation_partition_ptr> find_partition(const dht::decorated_key& key) const;
future<const_mutation_partition_ptr> find_partition_slow(const partition_key& key) const;
future<const_row_ptr> find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const;
void apply(const frozen_mutation& m);
void apply(const mutation& m);
// Returns at most "cmd.limit" rows
@@ -103,13 +103,13 @@ private:
// so that iteration can be stopped by returning false.
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)
template <typename Func>
bool for_all_partitions(Func&& func) const;
future<bool> for_all_partitions(Func&& func) const;
future<> probe_file(sstring sstdir, sstring fname);
void seal_on_overflow();
public:
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
bool for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
future<bool> for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
friend std::ostream& operator<<(std::ostream& out, const column_family& cf);
};

View File

@@ -345,7 +345,7 @@ public:
};
future<mutation_opt>
sstables::sstable::convert_row(schema_ptr schema, const sstables::key& key) {
sstables::sstable::read_row(schema_ptr schema, const sstables::key& key) {
assert(schema);

View File

@@ -91,7 +91,7 @@ public:
_generation = generation;
}
future<mutation_opt> convert_row(schema_ptr schema, const key& k);
future<mutation_opt> read_row(schema_ptr schema, const key& k);
private:
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;

View File

@@ -126,9 +126,9 @@ public:
column_name = std::move(column_name),
expected = std::move(expected),
table_name = std::move(table_name)] (database& db) {
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf.schema();
auto p = cf.find_partition_slow(pkey);
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf.schema();
return cf.find_partition_slow(pkey).then([schema, ck, column_name, expected] (column_family::const_mutation_partition_ptr p) {
assert(p != nullptr);
auto row = p->find_row(clustering_key::from_deeply_exploded(*schema, ck));
assert(row != nullptr);
@@ -150,6 +150,7 @@ public:
serialization_format::internal());
}
assert(col_def->type->equal(actual, col_def->type->decompose(expected)));
});
});
}

View File

@@ -8,6 +8,7 @@
#include "core/sstring.hh"
#include "database.hh"
#include "utils/UUID_gen.hh"
#include "core/do_with.hh"
#include <random>
static sstring some_keyspace("ks");
@@ -228,14 +229,15 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
insert_row(1003, 2003);
auto verify_row = [&] (int32_t c1, int32_t r1) {
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
auto r = cf.find_row(dht::global_partitioner().decorate_key(*s, key), c_key);
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
return cf.find_row(dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) {
BOOST_REQUIRE(r);
auto i = r->find_cell(r1_col.id);
BOOST_REQUIRE(i);
auto cell = i->as_atomic_cell();
BOOST_REQUIRE(cell.is_live());
BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1)));
});
};
verify_row(1001, 2001);
verify_row(1002, 2002);
@@ -244,13 +246,13 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
}
SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
column_family::config cfg;
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
column_family cf(s, cfg);
column_family::config cfg;
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
return do_with(column_family(s, cfg), [s] (column_family& cf) {
std::map<int32_t, std::map<int32_t, int32_t>> shadow, result;
const column_definition& r1_col = *s->get_column_definition("r1");
@@ -275,7 +277,8 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
cf.seal_active_memtable();
}
cf.for_all_partitions_slow([&] (const dht::decorated_key& pk, const mutation_partition& mp) {
return do_with(std::move(result), [&cf, s, &r1_col, shadow] (auto& result) {
return cf.for_all_partitions_slow([&] (const dht::decorated_key& pk, const mutation_partition& mp) {
auto p1 = boost::any_cast<int32_t>(int32_type->deserialize(pk._key.explode(*s)[0]));
for (const rows_entry& re : mp.range(*s, query::range<clustering_key_prefix>())) {
auto c1 = boost::any_cast<int32_t>(int32_type->deserialize(re.key().explode(*s)[0]));
@@ -285,9 +288,11 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
}
}
return true;
}).then([&result, shadow] (bool ok) {
BOOST_REQUIRE(shadow == result);
});
});
BOOST_REQUIRE(shadow == result);
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(test_cell_ordering) {

View File

@@ -18,7 +18,7 @@ SEASTAR_TEST_CASE(nonexistent_key) {
return reusable_sst("tests/urchin/sstables/uncompressed", 1).then([] (auto sstp) {
return do_with(key::from_bytes(to_bytes("invalid_key")), [sstp] (auto& key) {
auto s = uncompressed_schema();
return sstp->convert_row(s, key).then([sstp, s, &key] (auto mutation) {
return sstp->read_row(s, key).then([sstp, s, &key] (auto mutation) {
BOOST_REQUIRE(!mutation);
return make_ready_future<>();
});
@@ -114,7 +114,7 @@ future<> test_no_clustered(bytes&& key, std::unordered_map<bytes, boost::any> &&
return reusable_sst("tests/urchin/sstables/uncompressed", 1).then([k = std::move(key), map = std::move(map)] (auto sstp) mutable {
return do_with(sstables::key(std::move(k)), [sstp, map = std::move(map)] (auto& key) {
auto s = uncompressed_schema();
return sstp->convert_row(s, key).then([sstp, s, &key, map = std::move(map)] (auto mutation) {
return sstp->read_row(s, key).then([sstp, s, &key, map = std::move(map)] (auto mutation) {
BOOST_REQUIRE(mutation);
auto& mp = mutation->partition();
for (auto&& e : mp.range(*s, query::range<clustering_key_prefix>())) {
@@ -180,7 +180,7 @@ future<mutation> generate_clustered(bytes&& key) {
return reusable_sst("tests/urchin/sstables/complex", Generation).then([k = std::move(key)] (auto sstp) mutable {
return do_with(sstables::key(std::move(k)), [sstp] (auto& key) {
auto s = complex_schema();
return sstp->convert_row(s, key).then([sstp, s, &key] (auto mutation) {
return sstp->read_row(s, key).then([sstp, s, &key] (auto mutation) {
BOOST_REQUIRE(mutation);
return std::move(*mutation);
});

View File

@@ -126,7 +126,6 @@ public:
dk = std::move(dk),
column_parent = std::move(column_parent),
predicate = std::move(predicate)] (database& db) {
std::vector<ColumnOrSuperColumn> ret;
if (!column_parent.super_column.empty()) {
throw unimplemented_exception();
}
@@ -134,8 +133,9 @@ public:
if (predicate.__isset.column_names) {
throw unimplemented_exception();
} else if (predicate.__isset.slice_range) {
auto&& range = predicate.slice_range;
column_family::const_row_ptr rw = cf.find_row(dk, clustering_key::make_empty(*cf.schema()));
auto&& range = predicate.slice_range;
return cf.find_row(dk, clustering_key::make_empty(*cf.schema())).then([&cf, range = std::move(range)] (column_family::const_row_ptr rw) {
std::vector<ColumnOrSuperColumn> ret;
if (rw) {
auto beg = cf.schema()->regular_begin();
if (!range.start.empty()) {
@@ -165,6 +165,7 @@ public:
}
}
return make_foreign(make_lw_shared(std::move(ret)));
});
} else {
throw make_exception<InvalidRequestException>("empty SlicePredicate");
}