diff --git a/configure.py b/configure.py index 575ea2aee2..c5c9383fa4 100755 --- a/configure.py +++ b/configure.py @@ -509,6 +509,7 @@ urchin_core = (['database.cc', urchin_tests_dependencies = urchin_core + http + api + [ 'tests/urchin/cql_test_env.cc', 'tests/urchin/cql_assertions.cc', + 'tests/urchin/result_set_assertions.cc', ] deps = { diff --git a/database.cc b/database.cc index 31231d23ce..4ea9e32d10 100644 --- a/database.cc +++ b/database.cc @@ -1019,7 +1019,7 @@ column_family::query(const query::read_command& cmd, const std::vectorkey()); - mo->partition().query(*_schema, qs.cmd.slice, qs.limit, p_builder); + mo->partition().query(p_builder, *_schema, qs.cmd.slice, qs.limit); p_builder.finish(); qs.limit -= p_builder.row_count(); } else { diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 5d11c623fc..726ef2f93a 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1077,11 +1077,7 @@ query(service::storage_proxy& proxy, const sstring& cf_name) { query::partition_slice slice{{query::clustering_range::make_open_ended_both_sides()}, static_cols, regular_cols, opts}; auto cmd = make_lw_shared(schema->id(), slice, std::numeric_limits::max()); return proxy.query(cmd, {query::full_partition_range}, db::consistency_level::ONE).then([schema, cmd] (auto&& result) { - query::result_set_builder builder{schema}; - bytes_ostream w(result->buf()); - query::result_view view(w.linearize()); - view.consume(cmd->slice, builder); - return builder.build(); + return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *result)); }); } diff --git a/mutation.cc b/mutation.cc index eb0456c7ef..fbb9a6a5fa 100644 --- a/mutation.cc +++ b/mutation.cc @@ -98,3 +98,12 @@ bool mutation::operator==(const mutation& m) const { bool mutation::operator!=(const mutation& m) const { return !(*this == m); } + +query::result +mutation::query(const query::partition_slice& slice, uint32_t row_limit) const { + query::result::builder builder(slice); + auto pb = builder.add_partition(key()); + _p.query(pb, *_schema, slice, row_limit); + pb.finish(); + return builder.build(); +} diff --git a/mutation.hh b/mutation.hh index 806fffcd84..c5d8af6b45 100644 --- a/mutation.hh +++ b/mutation.hh @@ -41,6 +41,8 @@ public: const utils::UUID& column_family_id() const { return _schema->id(); } bool operator==(const mutation&) const; bool operator!=(const mutation&) const; +public: + query::result query(const query::partition_slice&, uint32_t row_limit = query::max_rows) const; private: friend std::ostream& operator<<(std::ostream& os, const mutation& m); }; diff --git a/mutation_partition.cc b/mutation_partition.cc index ea7b693007..6fe0b5856a 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -270,10 +270,10 @@ bool has_any_live_data(const row& cells, tombstone tomb, ColumnDefResolver&& id_ } void -mutation_partition::query(const schema& s, +mutation_partition::query(query::result::partition_writer& pw, + const schema& s, const query::partition_slice& slice, - uint32_t limit, - query::result::partition_writer& pw) const + uint32_t limit) const { auto regular_column_resolver = [&s] (column_id id) -> const column_definition& { return s.regular_column_at(id); diff --git a/mutation_partition.hh b/mutation_partition.hh index 651f123c69..7b50f601f0 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -273,5 +273,5 @@ public: tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const; boost::iterator_range range(const schema& schema, const query::range& r) const; // Returns at most "limit" rows. The limit must be greater than 0. - void query(const schema& s, const query::partition_slice& slice, uint32_t limit, query::result::partition_writer& pw) const; + void query(query::result::partition_writer& pw, const schema& s, const query::partition_slice& slice, uint32_t limit = query::max_rows) const; }; diff --git a/query-request.hh b/query-request.hh index 7f25bc96c3..cccc934ef6 100644 --- a/query-request.hh +++ b/query-request.hh @@ -241,6 +241,8 @@ public: friend std::ostream& operator<<(std::ostream& out, const partition_slice& ps); }; +constexpr auto max_rows = std::numeric_limits::max(); + // Full specification of a query to the database. // Intended for passing across replicas. // Can be accessed across cores. @@ -250,7 +252,7 @@ public: partition_slice slice; uint32_t row_limit; public: - read_command(const utils::UUID& cf_id, partition_slice slice, uint32_t row_limit) + read_command(const utils::UUID& cf_id, partition_slice slice, uint32_t row_limit = max_rows) : cf_id(cf_id) , slice(std::move(slice)) , row_limit(row_limit) diff --git a/query-result-set.cc b/query-result-set.cc index 0e62fb6ddc..7e9730f720 100644 --- a/query-result-set.cc +++ b/query-result-set.cc @@ -3,9 +3,31 @@ */ #include "query-result-set.hh" +#include "query-result-reader.hh" namespace query { +// Result set builder is passed as a visitor to query_result::consume() +// function. You can call the build() method to obtain a result set that +// contains cells from the visited results. +class result_set_builder { + schema_ptr _schema; + std::vector _rows; + std::unordered_map _pkey_cells; +public: + result_set_builder(schema_ptr schema); + result_set build() const; + void accept_new_partition(const partition_key& key, uint32_t row_count); + void accept_new_partition(uint32_t row_count); + void accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row); + void accept_new_row(const result_row_view &static_row, const result_row_view &row); + void accept_partition_end(const result_row_view& static_row); +private: + std::unordered_map deserialize(const partition_key& key); + std::unordered_map deserialize(const clustering_key& key); + std::unordered_map deserialize(const result_row_view& row, bool is_static); +}; + std::ostream& operator<<(std::ostream& out, const result_set_row& row) { for (auto&& cell : row._cells) { auto&& type = cell.second.type(); @@ -26,8 +48,8 @@ result_set_builder::result_set_builder(schema_ptr schema) : _schema{schema} { } -lw_shared_ptr result_set_builder::build() const { - return make_lw_shared(_rows); +result_set result_set_builder::build() const { + return { _schema, _rows }; } void result_set_builder::accept_new_partition(const partition_key& key, uint32_t row_count) @@ -119,4 +141,22 @@ result_set_builder::deserialize(const result_row_view& row, bool is_static) return cells; } +result_set +result_set::from_raw_result(schema_ptr s, const partition_slice& slice, const result& r) { + auto make = [&slice, s = std::move(s)] (bytes_view v) mutable { + result_set_builder builder{std::move(s)}; + result_view view(v); + view.consume(slice, builder); + return builder.build(); + }; + + if (r.buf().is_linearized()) { + return make(r.buf().view()); + } else { + // FIXME: make result_view::consume() work on fragments to avoid linearization. + bytes_ostream w(r.buf()); + return make(w.linearize()); + } +} + } diff --git a/query-result-set.hh b/query-result-set.hh index e2c3f1dbb3..9f35eca9d4 100644 --- a/query-result-set.hh +++ b/query-result-set.hh @@ -4,9 +4,11 @@ #pragma once -#include "query-result-reader.hh" #include "core/shared_ptr.hh" +#include "query-request.hh" +#include "query-result.hh" +#include "schema.hh" #include #include @@ -39,14 +41,19 @@ public: return _cells.count(column_name) > 0; } // Look up a deserialized row cell value by column name. - template - std::experimental::optional - get(const sstring& column_name) const throw (no_such_column) { + const data_value& + get_data_value(const sstring& column_name) const throw (no_such_column) { auto it = _cells.find(column_name); if (it == _cells.end()) { throw no_such_column(column_name); } - auto&& value = it->second.value(); + return it->second; + } + // Look up a deserialized row cell value by column name. + template + std::experimental::optional + get(const sstring& column_name) const throw (no_such_column) { + auto&& value = get_data_value(column_name).value(); if (value.empty()) { return std::experimental::nullopt; } @@ -77,10 +84,12 @@ inline bool operator!=(const result_set_row& x, const result_set_row& y) { // deserialized format. To obtain a result set, use the result_set_builder // class as a visitor to query_result::consume() function. class result_set { + schema_ptr _schema; std::vector _rows; public: - result_set(const std::vector& rows) - : _rows{std::move(rows)} + static result_set from_raw_result(schema_ptr, const partition_slice&, const result&); + result_set(schema_ptr s, const std::vector& rows) + : _schema(std::move(s)), _rows{std::move(rows)} { } bool empty() const { return _rows.empty(); @@ -94,6 +103,9 @@ public: const std::vector& rows() const { return _rows; } + const schema_ptr& schema() const { + return _schema; + } friend inline bool operator==(const result_set& x, const result_set& y); friend std::ostream& operator<<(std::ostream& out, const result_set& rs); }; @@ -102,25 +114,4 @@ inline bool operator==(const result_set& x, const result_set& y) { return x._rows == y._rows; } -// Result set builder is passed as a visitor to query_result::consume() -// function. You can call the build() method to obtain a result set that -// contains cells from the visited results. -class result_set_builder { - schema_ptr _schema; - std::vector _rows; - std::unordered_map _pkey_cells; -public: - result_set_builder(schema_ptr schema); - lw_shared_ptr build() const; - void accept_new_partition(const partition_key& key, uint32_t row_count); - void accept_new_partition(uint32_t row_count); - void accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row); - void accept_new_row(const result_row_view &static_row, const result_row_view &row); - void accept_partition_end(const result_row_view& static_row); -private: - std::unordered_map deserialize(const partition_key& key); - std::unordered_map deserialize(const clustering_key& key); - std::unordered_map deserialize(const result_row_view& row, bool is_static); -}; - } diff --git a/query-result.hh b/query-result.hh index 6a7a26aeb7..c542868553 100644 --- a/query-result.hh +++ b/query-result.hh @@ -33,7 +33,7 @@ public: }; // -// The query results are stored in a serialized from. This is in order to +// The query results are stored in a serialized form. This is in order to // address the following problems, which a structured format has: // // - high level of indirection (vector of vectors of vectors of blobs), which @@ -62,7 +62,7 @@ public: // pass the data using zero-copy to the client, prepending a header. // // Users which need more complex structure of query results, should -// trasnform it to such using appropriate visitors. +// transform it to such using appropriate visitors. // TODO: insert reference to such visitors here. // // Query results have dynamic format. In some queries (maybe even in typical diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 075d7bbee3..1516e68578 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1620,11 +1620,7 @@ storage_proxy::query_local(const sstring& ks_name, const sstring& cf_name, const }); }); }).then([this, schema, slice] (auto&& result) { - query::result_set_builder builder{schema}; - bytes_ostream w(result->buf()); - query::result_view view(w.linearize()); - view.consume(slice, builder); - return builder.build(); + return make_lw_shared(query::result_set::from_raw_result(schema, slice, *result)); }); } diff --git a/tests/urchin/mutation_reader_assertions.hh b/tests/urchin/mutation_reader_assertions.hh index 8a5f47c0c6..d06bc054c2 100644 --- a/tests/urchin/mutation_reader_assertions.hh +++ b/tests/urchin/mutation_reader_assertions.hh @@ -33,6 +33,7 @@ public: } }; -static reader_assertions assert_that(mutation_reader r) { +inline +reader_assertions assert_that(mutation_reader r) { return { std::move(r) }; } diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 50b672809c..622c251f27 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -4,12 +4,26 @@ #define BOOST_TEST_DYN_LINK -#include "tests/test-utils.hh" +#include +#include +#include +#include + #include "core/sstring.hh" +#include "core/do_with.hh" +#include "core/thread.hh" + #include "database.hh" #include "utils/UUID_gen.hh" -#include "core/do_with.hh" -#include +#include "mutation_reader.hh" +#include "schema_builder.hh" +#include "query-result-set.hh" +#include "query-result-reader.hh" + +#include "tests/test-utils.hh" +#include "tests/urchin/mutation_assertions.hh" +#include "tests/urchin/mutation_reader_assertions.hh" +#include "tests/urchin/result_set_assertions.hh" static sstring some_keyspace("ks"); static sstring some_column_family("cf"); @@ -383,3 +397,54 @@ SEASTAR_TEST_CASE(test_cell_ordering) { atomic_cell::make_dead(1, expiry_2)); return make_ready_future<>(); } + +static query::partition_slice make_full_slice(const schema& s) { + query::partition_slice::option_set options; + options.set(); + options.set(); + options.set(); + + std::vector ranges; + ranges.emplace_back(query::clustering_range::make_open_ended_both_sides()); + + std::vector static_columns; + boost::range::push_back(static_columns, + s.static_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id))); + + std::vector regular_columns; + boost::range::push_back(regular_columns, + s.regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id))); + + return { + std::move(ranges), + std::move(static_columns), + std::move(regular_columns), + std::move(options) + }; +} + +SEASTAR_TEST_CASE(test_querying_of_mutation) { + return seastar::async([] { + auto s = schema_builder("ks", "cf") + .with_column("pk", bytes_type, column_kind::partition_key) + .with_column("v", bytes_type, column_kind::regular_column) + .build(); + + auto resultify = [s] (const mutation& m) -> query::result_set { + auto slice = make_full_slice(*s); + return query::result_set::from_raw_result(s, slice, m.query(slice)); + }; + + mutation m(partition_key::from_single_value(*s, "key1"), s); + m.set_clustered_cell(clustering_key::make_empty(*s), "v", bytes("v1"), 1); + + assert_that(resultify(m)) + .has_only(a_row() + .with_column("pk", bytes("key1")) + .with_column("v", bytes("v1"))); + + m.partition().apply(tombstone(2, gc_clock::now())); + + assert_that(resultify(m)).is_empty(); + }); +} diff --git a/tests/urchin/result_set_assertions.cc b/tests/urchin/result_set_assertions.cc new file mode 100644 index 0000000000..1bdebf2efc --- /dev/null +++ b/tests/urchin/result_set_assertions.cc @@ -0,0 +1,73 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include + +#include "result_set_assertions.hh" +#include "to_string.hh" + +static inline +sstring to_sstring(const bytes& b) { + return sstring(b.begin(), b.end()); +} + +bool +row_assertion::matches(const query::result_set_row& row) const { + for (auto&& column_and_value : _expected_values) { + auto&& name = column_and_value.first; + auto&& value = column_and_value.second; + + // FIXME: result_set_row works on sstring column names instead of more general "bytes". + auto ss_name = to_sstring(name); + + if (!row.has(ss_name)) { + return false; + } + const data_value& val = row.get_data_value(ss_name); + if (val != data_value(boost::any(value), val.type())) { + return false; + } + } + return true; +} + +sstring +row_assertion::describe(schema_ptr schema) const { + return "{" + ::join(", ", _expected_values | boost::adaptors::transformed([&schema] (auto&& e) { + auto&& name = e.first; + auto&& value = e.second; + const column_definition* def = schema->get_column_definition(name); + if (!def) { + BOOST_FAIL(sprint("Schema is missing column definition for '%s'", name)); + } + return sprint("%s=\"%s\"", to_sstring(name), def->type->to_string(def->type->decompose(value))); + })) + "}"; +} + +const result_set_assertions& +result_set_assertions::has(const row_assertion& ra) const { + for (auto&& row : _rs.rows()) { + if (ra.matches(row)) { + return *this; + } + } + BOOST_FAIL(sprint("Row %s not found in %s", ra.describe(_rs.schema()), _rs)); + return *this; +} + +const result_set_assertions& +result_set_assertions::has_only(const row_assertion& ra) const { + BOOST_REQUIRE(_rs.rows().size() == 1); + auto& row = _rs.rows()[0]; + if (!ra.matches(row)) { + BOOST_FAIL(sprint("Expected %s but got %s", ra.describe(_rs.schema()), row)); + } + return *this; +} + +const result_set_assertions& +result_set_assertions::is_empty() const { + BOOST_REQUIRE_EQUAL(_rs.rows().size(), 0); + return *this; +} diff --git a/tests/urchin/result_set_assertions.hh b/tests/urchin/result_set_assertions.hh new file mode 100644 index 0000000000..6d04cb4685 --- /dev/null +++ b/tests/urchin/result_set_assertions.hh @@ -0,0 +1,51 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once + +#include + +#include "query-result-set.hh" + +// +// Contains assertions for query::result_set objects +// +// Example use: +// +// assert_that(rs) +// .has(a_row().with_column("column_name", "value")); +// + +class row_assertion { + std::map _expected_values; +public: + row_assertion& with_column(bytes name, boost::any value) { + _expected_values.emplace(name, value); + return *this; + } +private: + friend class result_set_assertions; + bool matches(const query::result_set_row& row) const; + sstring describe(schema_ptr s) const; +}; + +inline +row_assertion a_row() { + return {}; +} + +class result_set_assertions { + const query::result_set& _rs; +public: + result_set_assertions(const query::result_set& rs) : _rs(rs) { } + const result_set_assertions& has(const row_assertion& ra) const; + const result_set_assertions& has_only(const row_assertion& ra) const; + const result_set_assertions& is_empty() const; +}; + +// Make rs live as long as the returned assertion object is used +inline +result_set_assertions assert_that(const query::result_set& rs) { + return { rs }; +}