Merge "Add tests for query interface on mutation level" from Tomasz

This commit is contained in:
Avi Kivity
2015-07-02 16:35:27 +03:00
16 changed files with 279 additions and 52 deletions

View File

@@ -509,6 +509,7 @@ urchin_core = (['database.cc',
urchin_tests_dependencies = urchin_core + http + api + [
'tests/urchin/cql_test_env.cc',
'tests/urchin/cql_assertions.cc',
'tests/urchin/result_set_assertions.cc',
]
deps = {

View File

@@ -1019,7 +1019,7 @@ column_family::query(const query::read_command& cmd, const std::vector<query::pa
return qs.reader().then([this, &qs](mutation_opt mo) {
if (mo) {
auto p_builder = qs.builder.add_partition(mo->key());
mo->partition().query(*_schema, qs.cmd.slice, qs.limit, p_builder);
mo->partition().query(p_builder, *_schema, qs.cmd.slice, qs.limit);
p_builder.finish();
qs.limit -= p_builder.row_count();
} else {

View File

@@ -1077,11 +1077,7 @@ query(service::storage_proxy& proxy, const sstring& cf_name) {
query::partition_slice slice{{query::clustering_range::make_open_ended_both_sides()}, static_cols, regular_cols, opts};
auto cmd = make_lw_shared<query::read_command>(schema->id(), slice, std::numeric_limits<uint32_t>::max());
return proxy.query(cmd, {query::full_partition_range}, db::consistency_level::ONE).then([schema, cmd] (auto&& result) {
query::result_set_builder builder{schema};
bytes_ostream w(result->buf());
query::result_view view(w.linearize());
view.consume(cmd->slice, builder);
return builder.build();
return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *result));
});
}

View File

@@ -98,3 +98,12 @@ bool mutation::operator==(const mutation& m) const {
bool mutation::operator!=(const mutation& m) const {
return !(*this == m);
}
query::result
mutation::query(const query::partition_slice& slice, uint32_t row_limit) const {
query::result::builder builder(slice);
auto pb = builder.add_partition(key());
_p.query(pb, *_schema, slice, row_limit);
pb.finish();
return builder.build();
}

View File

@@ -41,6 +41,8 @@ public:
const utils::UUID& column_family_id() const { return _schema->id(); }
bool operator==(const mutation&) const;
bool operator!=(const mutation&) const;
public:
query::result query(const query::partition_slice&, uint32_t row_limit = query::max_rows) const;
private:
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
};

View File

@@ -270,10 +270,10 @@ bool has_any_live_data(const row& cells, tombstone tomb, ColumnDefResolver&& id_
}
void
mutation_partition::query(const schema& s,
mutation_partition::query(query::result::partition_writer& pw,
const schema& s,
const query::partition_slice& slice,
uint32_t limit,
query::result::partition_writer& pw) const
uint32_t limit) const
{
auto regular_column_resolver = [&s] (column_id id) -> const column_definition& {
return s.regular_column_at(id);

View File

@@ -273,5 +273,5 @@ public:
tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const;
boost::iterator_range<rows_type::const_iterator> range(const schema& schema, const query::range<clustering_key_prefix>& r) const;
// Returns at most "limit" rows. The limit must be greater than 0.
void query(const schema& s, const query::partition_slice& slice, uint32_t limit, query::result::partition_writer& pw) const;
void query(query::result::partition_writer& pw, const schema& s, const query::partition_slice& slice, uint32_t limit = query::max_rows) const;
};

View File

@@ -241,6 +241,8 @@ public:
friend std::ostream& operator<<(std::ostream& out, const partition_slice& ps);
};
constexpr auto max_rows = std::numeric_limits<uint32_t>::max();
// Full specification of a query to the database.
// Intended for passing across replicas.
// Can be accessed across cores.
@@ -250,7 +252,7 @@ public:
partition_slice slice;
uint32_t row_limit;
public:
read_command(const utils::UUID& cf_id, partition_slice slice, uint32_t row_limit)
read_command(const utils::UUID& cf_id, partition_slice slice, uint32_t row_limit = max_rows)
: cf_id(cf_id)
, slice(std::move(slice))
, row_limit(row_limit)

View File

@@ -3,9 +3,31 @@
*/
#include "query-result-set.hh"
#include "query-result-reader.hh"
namespace query {
// Result set builder is passed as a visitor to query_result::consume()
// function. You can call the build() method to obtain a result set that
// contains cells from the visited results.
class result_set_builder {
schema_ptr _schema;
std::vector<result_set_row> _rows;
std::unordered_map<sstring, data_value> _pkey_cells;
public:
result_set_builder(schema_ptr schema);
result_set build() const;
void accept_new_partition(const partition_key& key, uint32_t row_count);
void accept_new_partition(uint32_t row_count);
void accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row);
void accept_new_row(const result_row_view &static_row, const result_row_view &row);
void accept_partition_end(const result_row_view& static_row);
private:
std::unordered_map<sstring, data_value> deserialize(const partition_key& key);
std::unordered_map<sstring, data_value> deserialize(const clustering_key& key);
std::unordered_map<sstring, data_value> deserialize(const result_row_view& row, bool is_static);
};
std::ostream& operator<<(std::ostream& out, const result_set_row& row) {
for (auto&& cell : row._cells) {
auto&& type = cell.second.type();
@@ -26,8 +48,8 @@ result_set_builder::result_set_builder(schema_ptr schema)
: _schema{schema}
{ }
lw_shared_ptr<result_set> result_set_builder::build() const {
return make_lw_shared<result_set>(_rows);
result_set result_set_builder::build() const {
return { _schema, _rows };
}
void result_set_builder::accept_new_partition(const partition_key& key, uint32_t row_count)
@@ -119,4 +141,22 @@ result_set_builder::deserialize(const result_row_view& row, bool is_static)
return cells;
}
result_set
result_set::from_raw_result(schema_ptr s, const partition_slice& slice, const result& r) {
auto make = [&slice, s = std::move(s)] (bytes_view v) mutable {
result_set_builder builder{std::move(s)};
result_view view(v);
view.consume(slice, builder);
return builder.build();
};
if (r.buf().is_linearized()) {
return make(r.buf().view());
} else {
// FIXME: make result_view::consume() work on fragments to avoid linearization.
bytes_ostream w(r.buf());
return make(w.linearize());
}
}
}

View File

@@ -4,9 +4,11 @@
#pragma once
#include "query-result-reader.hh"
#include "core/shared_ptr.hh"
#include "query-request.hh"
#include "query-result.hh"
#include "schema.hh"
#include <experimental/optional>
#include <stdexcept>
@@ -39,14 +41,19 @@ public:
return _cells.count(column_name) > 0;
}
// Look up a deserialized row cell value by column name.
template<typename T>
std::experimental::optional<T>
get(const sstring& column_name) const throw (no_such_column) {
const data_value&
get_data_value(const sstring& column_name) const throw (no_such_column) {
auto it = _cells.find(column_name);
if (it == _cells.end()) {
throw no_such_column(column_name);
}
auto&& value = it->second.value();
return it->second;
}
// Look up a deserialized row cell value by column name.
template<typename T>
std::experimental::optional<T>
get(const sstring& column_name) const throw (no_such_column) {
auto&& value = get_data_value(column_name).value();
if (value.empty()) {
return std::experimental::nullopt;
}
@@ -77,10 +84,12 @@ inline bool operator!=(const result_set_row& x, const result_set_row& y) {
// deserialized format. To obtain a result set, use the result_set_builder
// class as a visitor to query_result::consume() function.
class result_set {
schema_ptr _schema;
std::vector<result_set_row> _rows;
public:
result_set(const std::vector<result_set_row>& rows)
: _rows{std::move(rows)}
static result_set from_raw_result(schema_ptr, const partition_slice&, const result&);
result_set(schema_ptr s, const std::vector<result_set_row>& rows)
: _schema(std::move(s)), _rows{std::move(rows)}
{ }
bool empty() const {
return _rows.empty();
@@ -94,6 +103,9 @@ public:
const std::vector<result_set_row>& rows() const {
return _rows;
}
const schema_ptr& schema() const {
return _schema;
}
friend inline bool operator==(const result_set& x, const result_set& y);
friend std::ostream& operator<<(std::ostream& out, const result_set& rs);
};
@@ -102,25 +114,4 @@ inline bool operator==(const result_set& x, const result_set& y) {
return x._rows == y._rows;
}
// Result set builder is passed as a visitor to query_result::consume()
// function. You can call the build() method to obtain a result set that
// contains cells from the visited results.
class result_set_builder {
schema_ptr _schema;
std::vector<result_set_row> _rows;
std::unordered_map<sstring, data_value> _pkey_cells;
public:
result_set_builder(schema_ptr schema);
lw_shared_ptr<result_set> build() const;
void accept_new_partition(const partition_key& key, uint32_t row_count);
void accept_new_partition(uint32_t row_count);
void accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row);
void accept_new_row(const result_row_view &static_row, const result_row_view &row);
void accept_partition_end(const result_row_view& static_row);
private:
std::unordered_map<sstring, data_value> deserialize(const partition_key& key);
std::unordered_map<sstring, data_value> deserialize(const clustering_key& key);
std::unordered_map<sstring, data_value> deserialize(const result_row_view& row, bool is_static);
};
}

View File

@@ -33,7 +33,7 @@ public:
};
//
// The query results are stored in a serialized from. This is in order to
// The query results are stored in a serialized form. This is in order to
// address the following problems, which a structured format has:
//
// - high level of indirection (vector of vectors of vectors of blobs), which
@@ -62,7 +62,7 @@ public:
// pass the data using zero-copy to the client, prepending a header.
//
// Users which need more complex structure of query results, should
// trasnform it to such using appropriate visitors.
// transform it to such using appropriate visitors.
// TODO: insert reference to such visitors here.
//
// Query results have dynamic format. In some queries (maybe even in typical

View File

@@ -1620,11 +1620,7 @@ storage_proxy::query_local(const sstring& ks_name, const sstring& cf_name, const
});
});
}).then([this, schema, slice] (auto&& result) {
query::result_set_builder builder{schema};
bytes_ostream w(result->buf());
query::result_view view(w.linearize());
view.consume(slice, builder);
return builder.build();
return make_lw_shared(query::result_set::from_raw_result(schema, slice, *result));
});
}

View File

@@ -33,6 +33,7 @@ public:
}
};
static reader_assertions assert_that(mutation_reader r) {
inline
reader_assertions assert_that(mutation_reader r) {
return { std::move(r) };
}

View File

@@ -4,12 +4,26 @@
#define BOOST_TEST_DYN_LINK
#include "tests/test-utils.hh"
#include <random>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/algorithm_ext/push_back.hpp>
#include "core/sstring.hh"
#include "core/do_with.hh"
#include "core/thread.hh"
#include "database.hh"
#include "utils/UUID_gen.hh"
#include "core/do_with.hh"
#include <random>
#include "mutation_reader.hh"
#include "schema_builder.hh"
#include "query-result-set.hh"
#include "query-result-reader.hh"
#include "tests/test-utils.hh"
#include "tests/urchin/mutation_assertions.hh"
#include "tests/urchin/mutation_reader_assertions.hh"
#include "tests/urchin/result_set_assertions.hh"
static sstring some_keyspace("ks");
static sstring some_column_family("cf");
@@ -383,3 +397,54 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
atomic_cell::make_dead(1, expiry_2));
return make_ready_future<>();
}
static query::partition_slice make_full_slice(const schema& s) {
query::partition_slice::option_set options;
options.set<query::partition_slice::option::send_partition_key>();
options.set<query::partition_slice::option::send_clustering_key>();
options.set<query::partition_slice::option::send_timestamp_and_expiry>();
std::vector<query::clustering_range> ranges;
ranges.emplace_back(query::clustering_range::make_open_ended_both_sides());
std::vector<column_id> static_columns;
boost::range::push_back(static_columns,
s.static_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
std::vector<column_id> regular_columns;
boost::range::push_back(regular_columns,
s.regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
return {
std::move(ranges),
std::move(static_columns),
std::move(regular_columns),
std::move(options)
};
}
SEASTAR_TEST_CASE(test_querying_of_mutation) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v", bytes_type, column_kind::regular_column)
.build();
auto resultify = [s] (const mutation& m) -> query::result_set {
auto slice = make_full_slice(*s);
return query::result_set::from_raw_result(s, slice, m.query(slice));
};
mutation m(partition_key::from_single_value(*s, "key1"), s);
m.set_clustered_cell(clustering_key::make_empty(*s), "v", bytes("v1"), 1);
assert_that(resultify(m))
.has_only(a_row()
.with_column("pk", bytes("key1"))
.with_column("v", bytes("v1")));
m.partition().apply(tombstone(2, gc_clock::now()));
assert_that(resultify(m)).is_empty();
});
}

View File

@@ -0,0 +1,73 @@
/*
* Copyright 2015 Cloudius Systems
*/
#include <boost/test/unit_test.hpp>
#include "result_set_assertions.hh"
#include "to_string.hh"
static inline
sstring to_sstring(const bytes& b) {
return sstring(b.begin(), b.end());
}
bool
row_assertion::matches(const query::result_set_row& row) const {
for (auto&& column_and_value : _expected_values) {
auto&& name = column_and_value.first;
auto&& value = column_and_value.second;
// FIXME: result_set_row works on sstring column names instead of more general "bytes".
auto ss_name = to_sstring(name);
if (!row.has(ss_name)) {
return false;
}
const data_value& val = row.get_data_value(ss_name);
if (val != data_value(boost::any(value), val.type())) {
return false;
}
}
return true;
}
sstring
row_assertion::describe(schema_ptr schema) const {
return "{" + ::join(", ", _expected_values | boost::adaptors::transformed([&schema] (auto&& e) {
auto&& name = e.first;
auto&& value = e.second;
const column_definition* def = schema->get_column_definition(name);
if (!def) {
BOOST_FAIL(sprint("Schema is missing column definition for '%s'", name));
}
return sprint("%s=\"%s\"", to_sstring(name), def->type->to_string(def->type->decompose(value)));
})) + "}";
}
const result_set_assertions&
result_set_assertions::has(const row_assertion& ra) const {
for (auto&& row : _rs.rows()) {
if (ra.matches(row)) {
return *this;
}
}
BOOST_FAIL(sprint("Row %s not found in %s", ra.describe(_rs.schema()), _rs));
return *this;
}
const result_set_assertions&
result_set_assertions::has_only(const row_assertion& ra) const {
BOOST_REQUIRE(_rs.rows().size() == 1);
auto& row = _rs.rows()[0];
if (!ra.matches(row)) {
BOOST_FAIL(sprint("Expected %s but got %s", ra.describe(_rs.schema()), row));
}
return *this;
}
const result_set_assertions&
result_set_assertions::is_empty() const {
BOOST_REQUIRE_EQUAL(_rs.rows().size(), 0);
return *this;
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include <map>
#include "query-result-set.hh"
//
// Contains assertions for query::result_set objects
//
// Example use:
//
// assert_that(rs)
// .has(a_row().with_column("column_name", "value"));
//
class row_assertion {
std::map<bytes, boost::any> _expected_values;
public:
row_assertion& with_column(bytes name, boost::any value) {
_expected_values.emplace(name, value);
return *this;
}
private:
friend class result_set_assertions;
bool matches(const query::result_set_row& row) const;
sstring describe(schema_ptr s) const;
};
inline
row_assertion a_row() {
return {};
}
class result_set_assertions {
const query::result_set& _rs;
public:
result_set_assertions(const query::result_set& rs) : _rs(rs) { }
const result_set_assertions& has(const row_assertion& ra) const;
const result_set_assertions& has_only(const row_assertion& ra) const;
const result_set_assertions& is_empty() const;
};
// Make rs live as long as the returned assertion object is used
inline
result_set_assertions assert_that(const query::result_set& rs) {
return { rs };
}